https://prefect.io logo
Title
j

jspeis

02/24/2022, 1:36 PM
I have a flow A with multiple sequential tasks. When the last task of Flow A completes, I want to kick off flow B, could I do something like :
with Flow("FlowB") as flow:
    task_c = do_task_c()
    # ...
    

with Flow("FlowA") as flow:
    task_a = do_task_a()
    task_b = do_task_b(upstream_tasks=[task_a])
    # ... 
    create_flow_run(flow_name="FlowB")
though just want to confirm whether if I get create_flow_run to wait for task b to complete? (what would be the best way to arrange this?)
a

Anna Geller

02/24/2022, 1:43 PM
Prefect 1.0 uses an additional task 
wait_for_flow_run
 to block until the child flow run is completed.
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

with Flow("parent_flow") as flow:
    child_flow_run_id = create_flow_run(
        flow_name="child_flow_name", run_name="custom_run_name"
    )
    child_flowrunview = wait_for_flow_run(
        child_flow_run_id, raise_final_state=True, stream_logs=True
    )
The 
raise_final_state
 flag ensures that the state of this task will be set to the final state of the child flow run on completion. By using the default 
all_successful
 trigger, it prevents from running downstream tasks if the child flow run fails. Setting 
stream_logs=True
 allows to see the child flow run logs directly in the parent flow run.
So in your example, you could do:
with Flow("FlowB") as flow:
    task_c = do_task_c()
    # ...


with Flow("FlowA") as flow:
    task_a = do_task_a()
    task_b = do_task_b(upstream_tasks=[task_a])
    child_flow_run_id = create_flow_run(
        flow_name="child_flow_name", run_name="custom_run_name", upstream_tasks=[task_b]
    )
    child_flowrunview = wait_for_flow_run(
        child_flow_run_id, raise_final_state=True, stream_logs=True
    )
j

jspeis

02/24/2022, 1:51 PM
Great, thanks @Anna Geller!!
👍 1
@Anna Geller one comment, I didn't realize
create_flow_run
accepted an
upstream_tasks
param, should the docs be updated? (or was I looking at the wrong thing?)
a

Anna Geller

02/24/2022, 2:05 PM
Any task that you call in the Flow block can receive the
upstream_tasks=[task_b]
argument.
👍 1