Hi All, I have a question about setting upstream ...
# prefect-community
i
Hi All, I have a question about setting upstream tasks. Here’s some pseudo code:
Copy code
@task()
def task1():
	# do stuff 

@task()
def task2():
	# do other stuff 

ids = ['aaa', 'bbb', 'ccc'] 

for id in ids:
	x = task1()

y = task2(upstream_tasks=[x])  

flow.run(executor=LocalDaskExecutor)
My problem is that
task2()
fires when the first instance of
task1()
completes successfully - I need
task2()
to fire only once all of
task1()
instances have completed - what am I doing wrong here?
1
b
use
wait_for=[x]
n
hi @Igor Morgunov , it depends on whether you're using prefect 1 or 2. if prefect 1, you should use
task1.map(ids)
instead of a for loop and you need to define your flow as a `with Flow() as flow:`context manager if prefect 2, there is no longer a concept of an
Executor
, you'd want to use the
DaskTaskRunner
wait_for
is indeed what you want if prefect 2, and
upstream_tasks
is what you want for prefect 1.
i
Thanks @Nate, that worked, using prefect 1
🦜 1