Jacques
07/01/2020, 1:23 PMJeremiah
07/01/2020, 1:25 PMfrom prefect import task, Flow
@task
def two_lists():
return {
'list_one': [1, 2, 3],
'list_two': [101, 102, 103]
}
@task
def add_one(x):
return x + 1
with Flow('example') as flow:
lists = two_lists()
x = add_one.map(lists['list_one'])
y = add_one.map(lists['list_two'])
state = flow.run()
assert state.result[x].result == [2, 3, 4]
assert state.result[y].result == [102, 103, 104]
lists[0], lists[1]
it’ll also workAvi A
07/01/2020, 1:29 PMdict
(I use something similar in my flow)
both_items = task_that_generates_two_lists(..) # returns two lists
error_items, processed_items = both_items[0], both_items[1]
error_items_task.map(error_items)
processed_items_task.map(processed_items)
Jacques
07/01/2020, 1:32 PMerror_items_task.set_upstream(two_lists['list_one'], key="data", mapped=True)
Jeremiah
07/01/2020, 1:32 PMJacques
07/01/2020, 1:33 PM