Hi, I am debugging a mapped flow where I have two ...
# ask-community
c
Hi, I am debugging a mapped flow where I have two independent mapped pipelines and want the second to start only once the first is finished. It seems that a (mapped) task that depends on an upstream (mapped) task does not get started if an empty array is passed into the upstream mapped task. Is this wanted behaviour? I guess it can be debated if a task should be considered “finished” if it is never triggered, but on the other hand I expect the downstream task with a
run_always
trigger to run, well, always. Example code in thread
Copy code
@task
def n_array(n):
    return [1] * n


@task
def n_plus_one_array(n):
    return [1] * (n + 1)


@task(trigger=always_run)
def log_number(number):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(number)
    return number


with Flow("Test Flow") as flow:
    n = Parameter("n", default=5)
    array1 = n_array(n)
    one_result = log_number.map(array1)
    array2 = n_plus_one_array(n)
    two_result = log_number.map(
        array2,
        upstream_tasks=[one_result],
    )
So here nothing gets logged for
n=1
And another thing I just noticed: The second time the
log_number
task is called it only gets executed
len(array1)
times, while it should be processed
len(array2)
times.
k
Hey @Clemens, I think I’m not understanding completely. I still get logs for
n=1
(when using flow.run). Did you mean
n=0
?
c
oh sorry, yes
n=0
of course
🙈 1
k
Ah ok. So I think we’re on the same page nothing is logged. For me it’s
Copy code
[2021-07-20 12:35:31-0400] INFO - prefect.TaskRunner | Task 'log_number': Starting task run...
[2021-07-20 12:35:31-0400] INFO - prefect.TaskRunner | Task 'log_number': Finished task run for task with final state: 'Mapped'
[2021-07-20 12:35:31-0400] INFO - prefect.TaskRunner | Task 'log_number': Starting task run...
[2021-07-20 12:35:31-0400] INFO - prefect.TaskRunner | Task 'log_number': Finished task run for task with final state: 'Mapped'
But I’m curious what you’re expecting to be logged if the list is empty and there is nothing to map over?
c
sure, but the second time its called the list is not empty but of length one
Copy code
def n_plus_one_array(n):
    return [1] * (n + 1)
k
Ah gotcha! Completely understand. My suspicion is that the setting upstream is connecting the
i-th
element of each upstream map to the downstream map and the uneven lengths means that an element gets left out. I think this is by design though to chain consecutive mapped operations together so I suspect you will need a reduce step since these are independent. Let me confirm with the team.
Question though, if they are independent, is there a need to set the upstream dependency?
c
A reduce step does solve the problem and kind of makes sense in the map/reduce philosophy here.
I just want to have them run sequentially, so one pipeline should start only when the other is done. I was under the impression that setting an upstream task would be the canonical prefect way here, as there is no parametric dependency between the pipelines.
k
I’m still getting more answers from the team but this will fix your use case
Copy code
with Flow("Test Flow") as flow:
    one_result = log_number.map([0,1,2,3])
    two_result = log_number.map(
        [0,1,2,3,4],
        upstream_tasks=[unmapped(one_result)],
    )
c
ohhh, you need to set the upstream task as unmapped as well. All right, that makes sense
thanks!
👍 1