Hi All,
I have a question about setting upstream tasks. Here’s some pseudo code:
Copy code
@task()
def task1():
# do stuff
@task()
def task2():
# do other stuff
ids = ['aaa', 'bbb', 'ccc']
for id in ids:
x = task1()
y = task2(upstream_tasks=[x])
flow.run(executor=LocalDaskExecutor)
My problem is that
task2()
fires when the first instance of
task1()
completes successfully - I need
task2()
to fire only once all of
task1()
instances have completed - what am I doing wrong here?
✅ 1
b
Barada Sahu
09/06/2022, 5:08 PM
use
wait_for=[x]
n
Nate
09/06/2022, 6:38 PM
hi @Igor Morgunov , it depends on whether you're using prefect 1 or 2.
if prefect 1, you should use
task1.map(ids)
instead of a for loop and you need to define your flow as a `with Flow() as flow:`context manager
if prefect 2, there is no longer a concept of an
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.