Looking for help implementing a simple flow of flo...
# ask-community
s
Looking for help implementing a simple flow of flows. I am creating a parent flow using two registered flows, A and B. A is upstream of B. Flows A and B have parameters so I use Parameter tasks in the parent. Ultimately I want to pass results from A to B but still trying to get simple A then B with params running. I am trying two solutions: One with StartFlowRun and the other with the new create_flow_run. With StartFlowRun I can create a parent that will run A then B, using wait=True, but dont understand how to pass parameters or the result from A to B as a parameter. With create_flow_run, I am trying the following: create_flow_run A with parameters, wait_for_flow_run A, create_flow_run B with parameters, wait_for_flow_run B. When I create a parent flow in that order, it runs A and B at the same time. Looks like I am missing the way to specify A is upstream from B and wait until A is done before running B. I am also having trouble with get_task_run_result for task A using the return id from the call to create_flow_run A. Any help appreciated.
k
Hey @Steve Pamer, I recommend using the new
create_flow_run
task over the
StartFlowRun
task. First for you immediate question, the syntax is either of these two:
Copy code
@task()
def abc(a):
    return a
with Flow("ecs_testing") as flow:
    x = abc(1)
    y = abc(1, upstream_tasks=[x])
with Flow("ecs_tes") as flow2:
    x = abc(1)
    y = abc(1)
    y.set_upstream(x)
Second, the
StartFlowRun
task does not return the result to be passed from A to B. I recommend you use
get_task_run_result
and pass in the id created by
create_flow_run
into this task, and then you can get the result and pass it to B
z
Something like this should wait to start B until A is done since there's a data dependency
Copy code
@task
def do_something(x):
    return x

with Flow("child_a") as child_flow_a:
    a = Parameter("a")
    result = do_something(a)

with Flow("child_b") as child_flow_b:
    b = Parameter("b")
    result = do_something(b)

with Flow("parent") as parent_flow:
    a_run_id = create_flow_run(
        flow_name=child_flow_a.name, parameters=dict(a=10)
    )
    a_result = get_task_run_result(a_run_id, "do_something-1")
    b_run_id = create_flow_run(flow_name=child_flow_b.name, parameters=dict(b=a_result))
    wait_for_flow_run(b_run_id)
    wait_for_flow_run(a_run_id)
Something like this would be a bit more explicit and use
set_upstream
as described by Kevin
Copy code
with Flow("parent") as parent_flow:
    a_run_id = create_flow_run(
        flow_name=child_flow_a.name, parameters=dict(a=10)
    )
    a_result = get_task_run_result(a_run_id, "do_something-1")

    a_wait = wait_for_flow_run(a_run_id)
   
    b_run = create_flow_run(flow_name=child_flow_b.name, parameters=dict(b=a_result))
    b_run.set_upstream(a_wait)

    wait_for_flow_run(b_run_id)
s
Thanks for the quick reply. Will report back.
b_run.set_upstream(a_wait) and b_run.set_upstream(a_run_id) both worked to get B to wait for A. Now to get the results from A into B. Thanks!
hmm. getting an error getting the result from A. "ValueError: The task result has no
location
so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly"
z
Are you setting a result type on your tasks? https://docs.prefect.io/core/concepts/results.html#result-objects
We can't pass the result in-memory or anything since it's orchestrated separately so it has to be persisted to a disk/remote location
s
No, I wasnt doing that. Thanks.