jspeis
02/24/2022, 1:36 PMwith Flow("FlowB") as flow:
task_c = do_task_c()
# ...
with Flow("FlowA") as flow:
task_a = do_task_a()
task_b = do_task_b(upstream_tasks=[task_a])
# ...
create_flow_run(flow_name="FlowB")
though just want to confirm whether if I get create_flow_run to wait for task b to complete? (what would be the best way to arrange this?)Anna Geller
wait_for_flow_run
to block until the child flow run is completed.
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
with Flow("parent_flow") as flow:
child_flow_run_id = create_flow_run(
flow_name="child_flow_name", run_name="custom_run_name"
)
child_flowrunview = wait_for_flow_run(
child_flow_run_id, raise_final_state=True, stream_logs=True
)
The raise_final_state
flag ensures that the state of this task will be set to the final state of the child flow run on completion. By using the default all_successful
trigger, it prevents from running downstream tasks if the child flow run fails.
Setting stream_logs=True
allows to see the child flow run logs directly in the parent flow run.with Flow("FlowB") as flow:
task_c = do_task_c()
# ...
with Flow("FlowA") as flow:
task_a = do_task_a()
task_b = do_task_b(upstream_tasks=[task_a])
child_flow_run_id = create_flow_run(
flow_name="child_flow_name", run_name="custom_run_name", upstream_tasks=[task_b]
)
child_flowrunview = wait_for_flow_run(
child_flow_run_id, raise_final_state=True, stream_logs=True
)
jspeis
02/24/2022, 1:51 PMcreate_flow_run
accepted an upstream_tasks
param, should the docs be updated? (or was I looking at the wrong thing?)Anna Geller
upstream_tasks=[task_b]
argument.