https://prefect.io logo
Title
t

Tsang Yong

11/05/2019, 5:48 PM
import IPython
from prefect import task, Flow, Parameter


my_list = [1, 3, 4]


@task
def map1_fn(item):
    new_item = [item+1, item+1]
    print("map1_fn: {}".format(item))
    return new_item


@task
def map2_fn(item):
    # expecting item = 2 but got [2, 2] etc
    print("map2_fn: {}".format(item))
    return item


with Flow("workflow") as flow:
    my_other_list = map1_fn.map(item=my_list)
    my_another_list = map2_fn.map(item=my_other_list)

flow.run()
c

Chris White

11/05/2019, 6:12 PM
Hi @Tsang Yong! Good question; from Prefect’s perspective, the result of
my_other_list
is:
[[2, 2], [4, 4], [5, 5]]
and so when you map over this list with
map2_fn
you are mapping over a list whose individual elements are each lists
t

Tsang Yong

11/05/2019, 6:13 PM
oh so if I flatten it, I should get what I expect?
c

Chris White

11/05/2019, 6:13 PM
consequently, Prefect iterates through the list and provides each element to the
item
kwarg of
map2_fn
, and each element is itself a list
yea
t

Tsang Yong

11/05/2019, 6:15 PM
oh nice! so conceptually I basically need a reduce task.
thanks!
c

Chris White

11/05/2019, 6:16 PM
yup yup, and if you:
def my_reduce_task(my_list):
    ... # flatten list
    return flattened_list
with
flat_list = my_reduce_task(my_other_list)
prefect will provide the full list as the argument to the reduce task
anytime!