jspeis

    jspeis

    7 months ago
    I have a flow A with multiple sequential tasks. When the last task of Flow A completes, I want to kick off flow B, could I do something like :
    with Flow("FlowB") as flow:
        task_c = do_task_c()
        # ...
        
    
    with Flow("FlowA") as flow:
        task_a = do_task_a()
        task_b = do_task_b(upstream_tasks=[task_a])
        # ... 
        create_flow_run(flow_name="FlowB")
    though just want to confirm whether if I get create_flow_run to wait for task b to complete? (what would be the best way to arrange this?)
    Anna Geller

    Anna Geller

    7 months ago
    Prefect 1.0 uses an additional task 
    wait_for_flow_run
     to block until the child flow run is completed.
    from prefect import Flow
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
    
    with Flow("parent_flow") as flow:
        child_flow_run_id = create_flow_run(
            flow_name="child_flow_name", run_name="custom_run_name"
        )
        child_flowrunview = wait_for_flow_run(
            child_flow_run_id, raise_final_state=True, stream_logs=True
        )
    The 
    raise_final_state
     flag ensures that the state of this task will be set to the final state of the child flow run on completion. By using the default 
    all_successful
     trigger, it prevents from running downstream tasks if the child flow run fails. Setting 
    stream_logs=True
     allows to see the child flow run logs directly in the parent flow run.
    So in your example, you could do:
    with Flow("FlowB") as flow:
        task_c = do_task_c()
        # ...
    
    
    with Flow("FlowA") as flow:
        task_a = do_task_a()
        task_b = do_task_b(upstream_tasks=[task_a])
        child_flow_run_id = create_flow_run(
            flow_name="child_flow_name", run_name="custom_run_name", upstream_tasks=[task_b]
        )
        child_flowrunview = wait_for_flow_run(
            child_flow_run_id, raise_final_state=True, stream_logs=True
        )
    jspeis

    jspeis

    7 months ago
    Great, thanks @Anna Geller!!
    @Anna Geller one comment, I didn't realize
    create_flow_run
    accepted an
    upstream_tasks
    param, should the docs be updated? (or was I looking at the wrong thing?)
    Anna Geller

    Anna Geller

    7 months ago
    Any task that you call in the Flow block can receive the
    upstream_tasks=[task_b]
    argument.