Steve Pamer
07/30/2021, 7:43 PMKevin Kho
create_flow_run
task over the StartFlowRun
task. First for you immediate question, the syntax is either of these two:
@task()
def abc(a):
return a
with Flow("ecs_testing") as flow:
x = abc(1)
y = abc(1, upstream_tasks=[x])
with Flow("ecs_tes") as flow2:
x = abc(1)
y = abc(1)
y.set_upstream(x)
Kevin Kho
StartFlowRun
task does not return the result to be passed from A to B. I recommend you use get_task_run_result
and pass in the id created by create_flow_run
into this task, and then you can get the result and pass it to BZanie
@task
def do_something(x):
return x
with Flow("child_a") as child_flow_a:
a = Parameter("a")
result = do_something(a)
with Flow("child_b") as child_flow_b:
b = Parameter("b")
result = do_something(b)
with Flow("parent") as parent_flow:
a_run_id = create_flow_run(
flow_name=child_flow_a.name, parameters=dict(a=10)
)
a_result = get_task_run_result(a_run_id, "do_something-1")
b_run_id = create_flow_run(flow_name=child_flow_b.name, parameters=dict(b=a_result))
wait_for_flow_run(b_run_id)
wait_for_flow_run(a_run_id)
Zanie
set_upstream
as described by Kevin
with Flow("parent") as parent_flow:
a_run_id = create_flow_run(
flow_name=child_flow_a.name, parameters=dict(a=10)
)
a_result = get_task_run_result(a_run_id, "do_something-1")
a_wait = wait_for_flow_run(a_run_id)
b_run = create_flow_run(flow_name=child_flow_b.name, parameters=dict(b=a_result))
b_run.set_upstream(a_wait)
wait_for_flow_run(b_run_id)
Steve Pamer
07/30/2021, 7:54 PMSteve Pamer
07/30/2021, 8:05 PMSteve Pamer
07/30/2021, 8:29 PMlocation
so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly"Zanie
Zanie
Steve Pamer
07/30/2021, 8:48 PM