Hello ! :wave: Just a quick question, I have a tas...
# ask-community
c
Hello ! 👋 Just a quick question, I have a task
B
which depends on a undefined size list of other tasks
[A1, A2, A3, ...]
(Prefect therefore creates an implicit
List
task under the hood). The thing is, some tasks in the list
[A1, A2, A3, ...]
can be skip at runtime, but I still want
B
to be executed. I currently can’t achieve this, even if
skip_on_upstream_skip=False
is specified for
B
, since the implicit
List
task is skip without being able to do anything (I receive
None
, and not a list of optional elements). Is there a way to do it? Thanks!
a
@Côme Arvis I’m sure there is a way to do it. In general, when you attach the trigger all_finished and add the skip_on_upstream_skip=False, it will have the effect that this task will run always, regardless of the states of previous tasks. Do you happen to have a minimal flow I could use to test it out to help? You could also share your entire flow definition via DM
c
@Anna Geller Hi, thanks for your answer. Here is an example of a minimal flow that specifies what I would like to solve :
Copy code
from typing import List, Optional

from prefect import Flow, task
from prefect.engine import signals
from prefect.triggers import all_finished


@task(name="Task A")
def a():
    raise signals.SKIP("This task is an example of feature disabled at runtime")


@task(name="Task B")
def b():
    return "B"


@task(name="Task C")
def c():
    return "C"


@task(name="Aggregation", skip_on_upstream_skip=False, trigger=all_finished)
def aggregation(results: List[Optional[str]]) -> str:
    filtered_results = [result for result in results]

    return ", ".join(filtered_results)


with Flow("Minimal example") as flow:
    outputs = aggregation([a(), b(), c()])

flow.run()  # raise TypeError: 'NoneType' object is not iterable
As you can see, there is a
List
task that is created and executed before the task
Aggregation
, and which is skipped and therefore returns
None
a
I will look into it in more detail soon, thanks for sharing an example flow 🙏
@Côme Arvis do you use mapping to get this undefined size list of other tasks 
[A1, A2, A3, ...]
?
c
@Anna Geller Actually no, the list of those tasks
[A1, A2, A3, ...]
is recovered dynamically using
flow.get_tasks(name=parameter)
, which explains why the list is of an undefined size during registration time
a
Right now I think the best approach would be to redesign this flow a little bit to do iterative mapping: each mapped task would process one of those inputs (instead of tasks, do it based on task results) from the list, In this case, the skipped task would mean a skipped mapped child pipeline, rather than dealing with SKIP signal as a result of a task in the aggregation task. Does this explanation makes sense to you? can you DM me your flow or send how exactly do you retrieve the tasks into this dynamic list? (I may be wrong, but) atm I think you’re right that the way the current flow is designed, it won’t work, because this List task contains an exception from a task that gets skipped without returning anything. Therefore, the Aggregation task gets triggered (as the trigger says), but it cannot process your list of inputs because one input (from the skipped task) is invalid. But this wouldn’t happen with iterated mapping: https://docs.prefect.io/core/concepts/mapping.html#iterated-mapping
@Côme Arvis the cleanest and easiest solution would be to simply return None rather than SKIP exception in the task that has nothing to return. In that case, you could simply add a filter in your aggregation task “if result is not None”:
Copy code
from typing import List, Optional

from prefect import Flow, task
from prefect.engine import signals
from prefect.triggers import all_finished
from prefect.backend.artifacts import create_markdown_artifact


@task(name="Task A")
def a():
    return None
    # raise signals.SKIP("This task is an example of feature disabled at runtime")


@task(name="Task B")
def b():
    return "B"


@task(name="Task C")
def c():
    return "C"


@task(name="Aggregation", log_stdout=True)
def aggregation(results: List[Optional[str]]) -> str:
    filtered_results = [result for result in results if result is not None]
    res = ", ".join(filtered_results)
    print(res)
    return res


with Flow("Minimal example") as flow:
    outputs = aggregation([a(), b(), c()])

if __name__ == "__main__":
    flow.run()
You get the same flow structure, but the List task is no longer a problem:
c
Thanks @Anna Geller ! I will look at this in more detail
🙌 1
@Jean-David Fiquet @Aleksandr Liadov
2