Jadei
12/17/2021, 5:45 PMfrom prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
weekday_schedule = CronSchedule(
"30 9 * * 1-5", start_date=pendulum.now(tz="US/Eastern")
)
with Flow("parent-flow", schedule=weekday_schedule) as flow:
# assumes you have registered the following flows in a project named "examples"
flow_a = create_flow_run(flow_name="A", project_name="examples")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
flow_b = create_flow_run(flow_name="B", project_name="examples")
wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
flow_c = create_flow_run(flow_name="C", project_name="examples")
wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)
flow_d = create_flow_run(flow_name="D", project_name="examples")
wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)
b = wait_for_flow_b(upstream_tasks=[wait_for_flow_a])
c = wait_for_flow_c(upstream_tasks=[wait_for_flow_a])
d = wait_for_flow_d(upstream_tasks=[b, c])
flow.run()
Zanie
flow.run()
but then they won’t be trackedJadei
12/17/2021, 5:51 PMKevin Kho
Jadei
12/17/2021, 5:54 PMJadei
12/17/2021, 5:55 PMKevin Kho
flow.run()
directly instead of using the create_flow_run
task cuz that code calls the backend APIKevin Kho
ShellTask
Jadei
12/17/2021, 6:15 PMflow.run()
it seems like wait_for_flow_run
is not really workingKevin Kho
wait_for_flow_run
takes in a flow id so it needs a backend. If you use the ShellTask though combined with flow.run()
, I think it might wait for itKevin Kho
from flow_one import flowA
from flow_two import flowB
flowA.run()
flowB.run()
Jadei
12/17/2021, 6:31 PMKevin Kho
Jadei
12/17/2021, 6:41 PMKevin Kho
Jadei
12/17/2021, 6:54 PM