Hey I have a nested map (three) It is using the ou...
# prefect-community
k
Hey I have a nested map (three) It is using the output of the last result, instead of using the list of outputs. Do i need to do something to make the output of the 1st map iterable?
Copy code
check_file_generated.map(
        api.map(task_scheduler(configs,
                             single_date=single_date,
                             target_dates=target_dates)
               )
    )
Using 15.3 but i have used this pattern in other flows i am not sure why this one is different.
k
Is task_scheduler supported to be map there too?
I suspect the lists being fed into the mapped are uneven. Maybe you need to use
unmapped()
for one of the items?
k
@Kevin Kho task_scheduler output a list. So its nested 2 times.
k
Ah ok it looks right if the task scheduler returns a list already.
k
Let say we have 3 items in the list from
task_scheduler
and it goes to
api
and one of items doesnt return so it returns 2 items to
check_file_generated
that would cause it to use the last output from
api
3 times?
k
Oh crap. You are right there is some weirdness there so if you have
A -> B -> C
and all are map and A is 2 items but B is 3 items and C is 3 items, only 2 will run. Let me show an example.
k
The first item in the list has a
FAIL
signal because the file isnt ready but ultimately has a
Success
signal. Would that be a 3 > 2 > 1 (repeated 3 times) output?
k
Not exactly. The list is still intact because the FAIL will just propagate downstream, and not fire the child tasks by default. Actually thinking about, I thought know how your items should become uneven because they just pass one thing to each other
If your flow is this structure:
Copy code
from prefect import Flow, task

@task
def A(x):
    return x

@task
def B(x):
    if x == 2:
        raise ValueError()
    return x

@task(log_stdout=True)
def C(x):
    print(x)
    return x

with Flow("..") as flow:
    a = A.map([1,2,3])
    b = B.map(a)
    c = C.map(b)

flow.run()
It should just show
TriggerFailed
. Do you think you can edit this example to clarify the behavior? Sorry it’s not 100% in my head yet.
Edited the above code. This syntax seems to work fine:
Copy code
with Flow("..") as flow:
    C.map(B.map(A()))
I also don’t see how the length of the list could change between those unless you have other inputs to
B
and
C
k
Copy code
from prefect import Flow, task

@task
def A(arg1, arg2):
    return [arg1A, arg1B, arg2A, arg2B]  

@task
def B(x, y):
    if x == 2:
        raise ValueError()
    return x

@task(log_stdout=True)
def C(x, y):
    print(x)
    return x

with Flow("..") as flow:
    a = A(arg1, arg2)
    b = B.map(a)
    c = C.map(b)

flow.run()
A creates a list from multiple args
I have this exact same pattern in another flow and it works. I guess the only difference it the
A
function that is creating the list of arguments for function B, but function B spits out the values correctly. There is just something happening in step C that isnt use the out generated.
k
That shouldn’t matter if
A
is not a mapped call. Should still be fine. I had an argument
y
for
B
and
C
because I was trying something else but you can remove those and run and I think it should be as expected
k
okay i’ll look deeper how many responses are being returned. Thanks for your help