Luke Orland
02/25/2020, 9:56 PMupstream_tasks
arg on Task.map
. "a list of upstream dependencies to map over".
What would mapping over the upstream dependencies do?nicholas
02/25/2020, 10:08 PMupstream_tasks
arg on Task.map
will be resolved as the dependencies for all the mapped tasks. An example:
root = Root()()
node1_1 = Node(name="Node 1_1").map(upstream_tasks=[root])
node1_2 = Node(name="Node 1_2").map(upstream_tasks=[root])
This would mean that root
task would be the upstream dependency of all the tasks generated from node1_1
and node1_2
, to generate a schematic like this:upstream_tasks
argument creates state dependencies between tasks but does NOT create data dependencies. For mapping, we DO generate multiple task instances based on the length of the upstream result, but we don't pass the data. In this example, hi
would print 3 times:
from prefect import task, Flow
@task
def print_me():
print('hi')
@task
def return_list():
return [1, 2, 3]
with Flow("demo") as flow:
print_me.map(upstream_tasks=[return_list])
flow.run()
Luke Orland
02/26/2020, 5:02 PMdownstream_task.map(
twelve_data_objects,
upstream_tasks=[task_outputting_a_list_with_six_items]
)
generating 6 mapped tasks instead of 12.
I fixed it by using upstream_tasks=[unmapped(task_outputting_a_list_with_six_items)])