hello! I'm using Prefect 2 and I'm trying to re-us...
# ask-community
d
hello! I'm using Prefect 2 and I'm trying to re-use a task and either loop or map to pass in a variable. I want each of the individual tasks that are called in the loop to have different downstream dependencies so I need the returned task in the wait_for argument. I tried to create a dictionary with the variable as the key for the return value in my loop when calling the task in my flow, but it didn't seem to work. Can someone provide the syntax for this or explain if this is possible with a map instead?
j
Hey! you likely don't need to use
wait_for
if you're passing data between your tasks.
wait_for
is only to indicate that taskB should wait for taskA to finish ONLY if data from taskA is not being passed to to taskB. If you're passing data it happens automatically.
Heres an example of just using a for loop:
Copy code
from prefect import flow, task

@task
def upstream_task(n):
    return n + 1


@task
def downstream_task(name, n):
    print(f"Downstream {name}: {n}")


@flow
def my_flow():
    results = []
    for i in range(3):
        results.append(upstream_task(i))

    downstream_task("1", results[0])
    downstream_task("2", results[1])
    downstream_task("3", results[2])


if __name__ == '__main__':
    my_flow()
if you want the tasks in the loop to execute concurrently you can use
.map
or
.submit
with map:
Copy code
@flow
def my_flow():
    futures = upstream_task.map([i for i in range(3)])
    downstream_task("1", futures[0])
    ...
with submit:
Copy code
@flow
def my_flow():
    futures = []
    for i in range(3):
        futures.append(upstream_task.submit(i))

    downstream_task("1", futures[0])
    ...
d
Any way to tie the mapped variable to the task or do you have to depend on the order of the list?
j
Maybe something like?
Copy code
@flow
def my_flow():
    values = [1, 2, 3]
    futures = {v: upstream_task.submit(v) for v in values}
    print(futures)
You only need map/submit if you want to execute concurrently:
Copy code
@flow
def my_flow():
    values = [1, 2, 3]
    results = {v: upstream_task(v) for v in values}
    print(results)
is a valid thing to do
🙌 1
d
I want to run the tasks in parallel, will either of these accomplish that?
j
submit will, you can see the difference in the UI
regular task call vs submit:
d
awesome, thanks! Will let you know how it goes in a bit
🙌 1
Looks to be working good, thanks!