Hello, I was trying to set up a flow like this: ``...
# ask-community
g
Hello, I was trying to set up a flow like this:
Copy code
from prefect import Flow, task, apply_map

@task
def task1(arg1):
    return arg1

@task
def task2(arg2):
    return sum(arg2)

def jobMap(arg):
    print("branch"+str(arg))
    arg2 = task1.map(range(2))
    task2(arg2)

with Flow("workflow") as flow:
    apply_map(jobMap, range(3))
        
flow.run()
but I receive the following error: ValueError: Cannot set
mapped=True
when running from inside a mapped context
Is there any possibility to have a pipeline of mapping over mapping ? Thanks in advance Gian Piero
k
I think your issue is
task1.map()
should just be
task1
. You can’t use a map in a map.
g
thanks, but that was exactly what I needed. In airflow I have a dynamic DAG like this:
k
I don’t think this is a nested mapping. It looks like this structure where you can create batches:
Copy code
from prefect import Flow, task

@task
def create_input():
    return [1,2,3,4,5]

@task
def batch_input(list_x):
    batch1 = list_x[0:2]
    batch2 = list_x[2:6]
    return [batch1, batch2]

@task
def get_sum(list_x):
    return sum(x)

with Flow("...") as flow:
    start = create_input()
    batches = batch_input(start)
    get_sum.map(batches)
g
so in this case I understand that I have to wait the download of all source products. Only after that I can start the merge processes. The problem is that in some cases some products could take days to be available, but in the meanwhile I would like to provide the other asap.
k
I see what you mean. The tricky things is the mapping operation is tied to parallel processing. So the first layer of mapping occupies the available threads already. So when you call the second mapping, it tries to do the same thing. Dask doesn’t allow this starting of a multiprocess in a multiprocess. It will give errors.
g
ok, understood, thank you