Hello everyone, I want to map a task, inside a tas...
# ask-community
a
Hello everyone, I want to map a task, inside a task, but whenever I try it gives me the following error: `ValueError: Could not infer an active Flow context while creating edge to <Task: map_fn>. This often means you called a task outside a
with Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call
map_fn.run(...)
Below is the replication of what I want to do:
Copy code
from prefect import Flow, task

numbers = [1, 2, 3]

@task
def map_fn(x):
    return x + 1

@task
def reduce_fn():
    res = map_fn.map(numbers)
    print(res)

    return res + [1]

with Flow('flow1') as flow1:
    print(reduce_fn())

flow1.run()
Any suggestions or workaround will be appreciated.
s
You're getting that error because
reduce_fn
isn't being called until you run the flow, which is after you've exited the with context. You need to take the task result of
reduce_fn
and link that to the input of
map_fn
in the flow context. Try something like
Copy code
with Flow('flow1') as flow1:
  res = reduce_fn()
  map_fn.map(res)

flow.run()
👍 2
k
Hey @Abuzar Shaikh, Sam is right here. you can’t use the map inside tasks like this. If you want some form of multi-threaded execution in a task, you’d have to code it yourself. In general though, I think you want the Prefect map on the flow level because there is functionality for stuff like consecutive mapped operations.
a
Yeah I see, the approach does makes sense, I ended up using
multiprocessor.Pool
in the actual implementation. Thanks @Sam Cook @Kevin Kho.