Tsang Yong
11/05/2019, 5:48 PMimport IPython
from prefect import task, Flow, Parameter
my_list = [1, 3, 4]
@task
def map1_fn(item):
new_item = [item+1, item+1]
print("map1_fn: {}".format(item))
return new_item
@task
def map2_fn(item):
# expecting item = 2 but got [2, 2] etc
print("map2_fn: {}".format(item))
return item
with Flow("workflow") as flow:
my_other_list = map1_fn.map(item=my_list)
my_another_list = map2_fn.map(item=my_other_list)
flow.run()
Chris White
11/05/2019, 6:12 PMmy_other_list
is:
[[2, 2], [4, 4], [5, 5]]
and so when you map over this list with map2_fn
you are mapping over a list whose individual elements are each listsTsang Yong
11/05/2019, 6:13 PMChris White
11/05/2019, 6:13 PMitem
kwarg of map2_fn
, and each element is itself a listTsang Yong
11/05/2019, 6:15 PMChris White
11/05/2019, 6:16 PMdef my_reduce_task(my_list):
... # flatten list
return flattened_list
with
flat_list = my_reduce_task(my_other_list)
prefect will provide the full list as the argument to the reduce task