Gian Piero Izzo
12/16/2021, 12:06 AMfrom prefect import Flow, task, apply_map
@task
def task1(arg1):
return arg1
@task
def task2(arg2):
return sum(arg2)
def jobMap(arg):
print("branch"+str(arg))
arg2 = task1.map(range(2))
task2(arg2)
with Flow("workflow") as flow:
apply_map(jobMap, range(3))
flow.run()
but I receive the following error:
ValueError: Cannot set mapped=True
when running from inside a mapped context
Is there any possibility to have a pipeline of mapping over mapping ?
Thanks in advance
Gian PieroKevin Kho
task1.map()
should just be task1
. You can’t use a map in a map.Gian Piero Izzo
12/16/2021, 12:17 AMGian Piero Izzo
12/16/2021, 12:18 AMKevin Kho
from prefect import Flow, task
@task
def create_input():
return [1,2,3,4,5]
@task
def batch_input(list_x):
batch1 = list_x[0:2]
batch2 = list_x[2:6]
return [batch1, batch2]
@task
def get_sum(list_x):
return sum(x)
with Flow("...") as flow:
start = create_input()
batches = batch_input(start)
get_sum.map(batches)
Gian Piero Izzo
12/16/2021, 12:28 AMKevin Kho
Gian Piero Izzo
12/16/2021, 1:36 AM