s

    Steve Pamer

    1 year ago
    Looking for help implementing a simple flow of flows. I am creating a parent flow using two registered flows, A and B. A is upstream of B. Flows A and B have parameters so I use Parameter tasks in the parent. Ultimately I want to pass results from A to B but still trying to get simple A then B with params running. I am trying two solutions: One with StartFlowRun and the other with the new create_flow_run. With StartFlowRun I can create a parent that will run A then B, using wait=True, but dont understand how to pass parameters or the result from A to B as a parameter. With create_flow_run, I am trying the following: create_flow_run A with parameters, wait_for_flow_run A, create_flow_run B with parameters, wait_for_flow_run B. When I create a parent flow in that order, it runs A and B at the same time. Looks like I am missing the way to specify A is upstream from B and wait until A is done before running B. I am also having trouble with get_task_run_result for task A using the return id from the call to create_flow_run A. Any help appreciated.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Steve Pamer, I recommend using the new
    create_flow_run
    task over the
    StartFlowRun
    task. First for you immediate question, the syntax is either of these two:
    @task()
    def abc(a):
        return a
    with Flow("ecs_testing") as flow:
        x = abc(1)
        y = abc(1, upstream_tasks=[x])
    with Flow("ecs_tes") as flow2:
        x = abc(1)
        y = abc(1)
        y.set_upstream(x)
    Second, the
    StartFlowRun
    task does not return the result to be passed from A to B. I recommend you use
    get_task_run_result
    and pass in the id created by
    create_flow_run
    into this task, and then you can get the result and pass it to B
    Michael Adkins

    Michael Adkins

    1 year ago
    Something like this should wait to start B until A is done since there's a data dependency
    @task
    def do_something(x):
        return x
    
    with Flow("child_a") as child_flow_a:
        a = Parameter("a")
        result = do_something(a)
    
    with Flow("child_b") as child_flow_b:
        b = Parameter("b")
        result = do_something(b)
    
    with Flow("parent") as parent_flow:
        a_run_id = create_flow_run(
            flow_name=child_flow_a.name, parameters=dict(a=10)
        )
        a_result = get_task_run_result(a_run_id, "do_something-1")
        b_run_id = create_flow_run(flow_name=child_flow_b.name, parameters=dict(b=a_result))
        wait_for_flow_run(b_run_id)
        wait_for_flow_run(a_run_id)
    Something like this would be a bit more explicit and use
    set_upstream
    as described by Kevin
    with Flow("parent") as parent_flow:
        a_run_id = create_flow_run(
            flow_name=child_flow_a.name, parameters=dict(a=10)
        )
        a_result = get_task_run_result(a_run_id, "do_something-1")
    
        a_wait = wait_for_flow_run(a_run_id)
       
        b_run = create_flow_run(flow_name=child_flow_b.name, parameters=dict(b=a_result))
        b_run.set_upstream(a_wait)
    
        wait_for_flow_run(b_run_id)
    s

    Steve Pamer

    1 year ago
    Thanks for the quick reply. Will report back.
    b_run.set_upstream(a_wait) and b_run.set_upstream(a_run_id) both worked to get B to wait for A. Now to get the results from A into B. Thanks!
    hmm. getting an error getting the result from A. "ValueError: The task result has no
    location
    so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly"
    Michael Adkins

    Michael Adkins

    1 year ago
    Are you setting a result type on your tasks? https://docs.prefect.io/core/concepts/results.html#result-objects
    We can't pass the result in-memory or anything since it's orchestrated separately so it has to be persisted to a disk/remote location
    s

    Steve Pamer

    1 year ago
    No, I wasnt doing that. Thanks.