Ilya Sapunov
06/10/2022, 3:31 PMerror_task = ShellTask(
name="error",
stream_output=True,
)
messages = ["HELLO", "WORLD"]
@task(max_retries=5, retry_delay=datetime.timedelta(seconds=5))
def echo(start_date, message):
current_date = prefect.context.get('task_loop_result', start_date)
if current_date >= datetime.datetime.now(current_date.tzinfo):
return True
error_task.run(
command="python error.py",
env={
"MESSAGE": message,
"DATE": current_date.date().isoformat()
},
)
print(f"Got data for {current_date.isoformat()}")
raise LOOP(
message=f"Got data for {current_date.isoformat()}",
result=(current_date + datetime.timedelta(days=1)),
)
with Flow("test") as flow:
date = DateTimeParameter("date")
echo.map(unmapped(date), messages)
flow_state = flow.run(date="2022-05-01")
import os
import random
print(os.getenv("MESSAGE"))
if random.random() * 100 > 70:
raise Exception
Kevin Kho
Ilya Sapunov
06/14/2022, 8:30 AMKevin Kho
from prefect import Flow, task
from prefect.engine.signals import LOOP
import datetime
import prefect
import random
@task(max_retries=5, retry_delay=datetime.timedelta(seconds=3))
def echo(start_date):
current_date = prefect.context.get('task_loop_result', start_date)
if current_date >= datetime.datetime.now(current_date.tzinfo):
return True
x = random.random()
if x < 0.1:
raise ValueError()
loop_count = prefect.context.get("task_loop_count")
print(f"{start_date} {current_date}-{loop_count}")
raise LOOP(
message=f"Got data for {current_date.isoformat()}",
result=(current_date + datetime.timedelta(days=1)),
)
with Flow("test") as flow:
echo.map([datetime.datetime(2022, 6, 1), datetime.datetime(2022, 6, 2)])
flow.run()