Muddassir Shaikh
03/16/2022, 9:16 AMSylvain Hazard
03/16/2022, 9:20 AMAnna Geller
03/16/2022, 12:14 PMupstream_tasks
keyword on the task
• fragile - what if your task A had some network issues and need more than 30 min? setting dependencies in this time-based manner is quite error-prone - it introduces the risk that your task B starts too early because task A hasn't finished yet.
But if you need these time-based dependencies for some reason, here are two options:
1) Manually setting time.sleep()
One hack to delay the execution of specific tasks would be to add time.sleep(3600)
e.g. to ensure that the next task runs one hour after the previous one finished.
2) Flow-of-flows
Using the flow-of-flows orchestration pattern, you could trigger several child-flows from a parent flow and you can add a custom scheduled_start_time
on the create_flow_run
task:
from datetime import timedelta
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
with Flow("parent_flow") as flow:
child_flow_run_id = create_flow_run(
flow_name="child_flow_name",
run_name="custom_run_name",
scheduled_start_time=timedelta(minutes=30),
)
child_flowrunview = wait_for_flow_run(
child_flow_run_id, raise_final_state=True, stream_logs=True,
)
Kevin Kho
03/16/2022, 1:53 PMcreate_flow_run
to schedule sounds like a waste of compute. I think you may as well make those flows with a schedule.