I'm a little confused about the `upstream_tasks` a...
# prefect-community
l
I'm a little confused about the
upstream_tasks
arg on
Task.map
. "a list of upstream dependencies to map over". What would mapping over the upstream dependencies do?
👀 2
assign different upstream dependencies to each input being mapped over?
n
Hi @Luke Orland! The
upstream_tasks
arg on
Task.map
will be resolved as the dependencies for all the mapped tasks. An example:
Copy code
root = Root()()
node1_1 = Node(name="Node 1_1").map(upstream_tasks=[root])
node1_2 = Node(name="Node 1_2").map(upstream_tasks=[root])
This would mean that
root
task would be the upstream dependency of all the tasks generated from
node1_1
and
node1_2
, to generate a schematic like this:
And to further clarify, using the
upstream_tasks
argument creates state dependencies between tasks but does NOT create data dependencies. For mapping, we DO generate multiple task instances based on the length of the upstream result, but we don't pass the data. In this example,
hi
would print 3 times:
Copy code
from prefect import task, Flow

@task
def print_me():
    print('hi')

@task
def return_list():
    return [1, 2, 3]

with Flow("demo") as flow:
    print_me.map(upstream_tasks=[return_list])

flow.run()
l
I think what I experienced was:
Copy code
downstream_task.map(
  twelve_data_objects,
  upstream_tasks=[task_outputting_a_list_with_six_items]
)
generating 6 mapped tasks instead of 12. I fixed it by using
upstream_tasks=[unmapped(task_outputting_a_list_with_six_items)])