https://prefect.io logo
Title
j

John Faucett

03/31/2020, 1:31 PM
Hi, does anyone know how I can get the result of one task to be passed to another task further downstream with the imperative api?
flow = Flow('foo', tasks=[t1,t2,t3, t4])

flow.add_edge(t1,t2, key='x')
flow.add_edge(t2,t3)
flow.add_edge(t3,t4)

# now t4 needs the results of t1 not t2 or t3
j

Jeremiah

03/31/2020, 1:55 PM
Hi @John Faucett, you should be able to use the same style you used for the
t1 →t2
edge using a key. For example,
flow.add_edge(t1, t4, key='y')
j

John Faucett

03/31/2020, 1:57 PM
Hi @Jeremiah thanks, and that shouldn’t mess up the run order of t1 > t2 > t3 > t4 ?
j

Jeremiah

03/31/2020, 1:58 PM
However, I’d caution you that
add_edge
is the lowest level API for setting dependencies and may skip some checks that higher level APIs perform (for example, handling constant values without wrapping them as tasks). You might opt for
flow.set_dependencies(t4, keyword_tasks={'y':t1})
or even the task’s own method`t4.set_upstream(t1, flow=flow, key='y')` . To be clear your way will work as expected for your example, but using the slightly higher-level APIs might future proof you a little better if more robust checks are introduced.
That’s correct — a task will only run when all of its upstream dependencies have finished. So if
t4
depends on
t1
and
t3
, whether passing data or just a state edge, it will wait for them all to finish.
j

John Faucett

03/31/2020, 2:03 PM
ah ok perfect. Thanks for the clarification 🙂
👍 1
j

Jeremiah

03/31/2020, 2:03 PM
You got it!
j

josh

04/02/2020, 1:23 PM
@Marvin archive “How to pass task results downstream using imperative API”