https://prefect.io logo
Title
a

Andrew Vaccaro

04/17/2020, 10:06 PM
Question about setting
upstream_tasks
to the same task being called twice with different arguments. I have something like
A(abcd)
B.set_dependencies(upstream_tasks=[A], keyword_tasks=...)
A(defg)
B.set_dependencies(upstream_tasks=[A], keyword_tasks=...)
where B must run after A, and each gets called with two separate sets of Parameters. If I call
B.set_dependencies(upstream_tasks=[A])
twice, will the second B wait on the second A? And is this the idiomatic way to do this?
n

nicholas

04/17/2020, 10:14 PM
Hi @Andrew Vaccaro! I think an easy way to handle that would be to assign task
A
to a variable each time, and then reference that variable like such:
a1 = A(abcd)
B().set_dependencies(upstream_tasks=[a1], keyword_tasks=...)
a2 = A(defg)
B().set_dependencies(upstream_tasks=[a2], keyword_tasks=...)
a

Andrew Vaccaro

04/17/2020, 10:15 PM
Oh duh I forgot you could use dummy/empty result variables to track the results of specific tasks.
I just watched @Laura Lorenz (she/her) demo this, too 🙂
🚀 2
Thanks for the fast response
n

nicholas

04/17/2020, 10:16 PM
Of course! 😄
a

Andrew Vaccaro

04/17/2020, 10:34 PM
Hmm actually I think only our last call to each task is actually being executed.
I'm kinda thinking this should be parameterized at the Flow level, which seems more idiomatic. We do have a dictionary as keyword tasks, with hard-coded params, instead of using actual results from a previous task.
B.set_dependencies(upstream_tasks=[A], keyword_tasks={'a': 123})
roughly
No real need for an answer at 4:30 on Friday don't worry about it for now; we're gonna experiment on Monday anyways.
n

nicholas

04/17/2020, 10:59 PM
Oh so sorry! I wasn't looking closely enough at the code. You'll need to add another
()
to those B tasks, otherwise it's referencing the same object! I've amended the code above, sorry again!