Chris Martin
08/11/2020, 2:59 PMtask.map
, but I can't get it to work and wonder if this is possible
Here's some example code:
from prefect import task, Flow, Task
@task
def task1():
pass
@task
def task2():
return ["a", "b", "c"]
@task
def task3(x):
pass
with Flow("test") as flow:
t1 = task1()
params = task2()
task3.map(params, upstream_tasks=[t1])
I can register this flow fine and the dag looks coorrect (see attachment), but when I run, task3
fails with "No upstream states can be mapped over".Am I doing something wrong here?Jim Crist-Harif
08/11/2020, 3:03 PMmap
are assumed to be iterables (to be mapped over) unless they're wrapped with prefect.unmapped
. Switching to
task3.map(params, upstream_tasks=[unmapped(t1)])
should work for you.Chris Martin
08/11/2020, 3:07 PM