Abuzar Shaikh
08/11/2021, 8:01 AMwith Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call map_fn.run(...)
Below is the replication of what I want to do:
from prefect import Flow, task
numbers = [1, 2, 3]
@task
def map_fn(x):
return x + 1
@task
def reduce_fn():
res = map_fn.map(numbers)
print(res)
return res + [1]
with Flow('flow1') as flow1:
print(reduce_fn())
flow1.run()
Any suggestions or workaround will be appreciated.Sam Cook
08/11/2021, 1:21 PMreduce_fn
isn't being called until you run the flow, which is after you've exited the with context. You need to take the task result of reduce_fn
and link that to the input of map_fn
in the flow context. Try something like
with Flow('flow1') as flow1:
res = reduce_fn()
map_fn.map(res)
flow.run()
Kevin Kho
Abuzar Shaikh
08/11/2021, 2:32 PMmultiprocessor.Pool
in the actual implementation. Thanks @Sam Cook @Kevin Kho.