https://prefect.io logo
Title
l

Luke Orland

02/25/2020, 9:56 PM
I'm a little confused about the
upstream_tasks
arg on
Task.map
. "a list of upstream dependencies to map over". What would mapping over the upstream dependencies do?
👀 2
assign different upstream dependencies to each input being mapped over?
n

nicholas

02/25/2020, 10:08 PM
Hi @Luke Orland! The
upstream_tasks
arg on
Task.map
will be resolved as the dependencies for all the mapped tasks. An example:
root = Root()()
node1_1 = Node(name="Node 1_1").map(upstream_tasks=[root])
node1_2 = Node(name="Node 1_2").map(upstream_tasks=[root])
This would mean that
root
task would be the upstream dependency of all the tasks generated from
node1_1
and
node1_2
, to generate a schematic like this:
And to further clarify, using the
upstream_tasks
argument creates state dependencies between tasks but does NOT create data dependencies. For mapping, we DO generate multiple task instances based on the length of the upstream result, but we don't pass the data. In this example,
hi
would print 3 times:
from prefect import task, Flow

@task
def print_me():
    print('hi')

@task
def return_list():
    return [1, 2, 3]

with Flow("demo") as flow:
    print_me.map(upstream_tasks=[return_list])

flow.run()
l

Luke Orland

02/26/2020, 5:02 PM
I think what I experienced was:
downstream_task.map(
  twelve_data_objects,
  upstream_tasks=[task_outputting_a_list_with_six_items]
)
generating 6 mapped tasks instead of 12. I fixed it by using
upstream_tasks=[unmapped(task_outputting_a_list_with_six_items)])