Côme Arvis
11/23/2021, 5:50 PMB
which depends on a undefined size list of other tasks [A1, A2, A3, ...]
(Prefect therefore creates an implicit List
task under the hood).
The thing is, some tasks in the list [A1, A2, A3, ...]
can be skip at runtime, but I still want B
to be executed.
I currently can’t achieve this, even if skip_on_upstream_skip=False
is specified for B
, since the implicit List
task is skip without being able to do anything (I receive None
, and not a list of optional elements).
Is there a way to do it? Thanks!Anna Geller
Côme Arvis
11/24/2021, 10:43 AMCôme Arvis
11/24/2021, 10:43 AMfrom typing import List, Optional
from prefect import Flow, task
from prefect.engine import signals
from prefect.triggers import all_finished
@task(name="Task A")
def a():
raise signals.SKIP("This task is an example of feature disabled at runtime")
@task(name="Task B")
def b():
return "B"
@task(name="Task C")
def c():
return "C"
@task(name="Aggregation", skip_on_upstream_skip=False, trigger=all_finished)
def aggregation(results: List[Optional[str]]) -> str:
filtered_results = [result for result in results]
return ", ".join(filtered_results)
with Flow("Minimal example") as flow:
outputs = aggregation([a(), b(), c()])
flow.run() # raise TypeError: 'NoneType' object is not iterable
Côme Arvis
11/24/2021, 10:48 AMList
task that is created and executed before the task Aggregation
, and which is skipped and therefore returns None
Anna Geller
Anna Geller
[A1, A2, A3, ...]
?Côme Arvis
11/24/2021, 11:53 AM[A1, A2, A3, ...]
is recovered dynamically using flow.get_tasks(name=parameter)
, which explains why the list is of an undefined size during registration timeAnna Geller
Anna Geller
from typing import List, Optional
from prefect import Flow, task
from prefect.engine import signals
from prefect.triggers import all_finished
from prefect.backend.artifacts import create_markdown_artifact
@task(name="Task A")
def a():
return None
# raise signals.SKIP("This task is an example of feature disabled at runtime")
@task(name="Task B")
def b():
return "B"
@task(name="Task C")
def c():
return "C"
@task(name="Aggregation", log_stdout=True)
def aggregation(results: List[Optional[str]]) -> str:
filtered_results = [result for result in results if result is not None]
res = ", ".join(filtered_results)
print(res)
return res
with Flow("Minimal example") as flow:
outputs = aggregation([a(), b(), c()])
if __name__ == "__main__":
flow.run()
You get the same flow structure, but the List task is no longer a problem:Côme Arvis
11/25/2021, 3:17 PMCôme Arvis
11/25/2021, 3:17 PM