Sylvain Hazard10/19/2021, 8:58 AM
and I would like to prevent C mapped tasks from being created if the upstream B mapped task is Failed. I have tried using a FilterTask which works great but serves as a kind of reducing task which stops depth first execution from working as I would like it to. Is there another way to do this while still making use of DFE ?
a = A() b = B.map(a) c = C.map(b)
Anna Geller10/19/2021, 1:29 PM
). If one of mapped B child tasks fails, then by default, this would cause that the downstream C child tasks would immediately end with a TriggerFailed state, because the default trigger is
. Here is an example flow that shows this:
Here an example independent flow:
from prefect import Flow, task from random import sample from prefect.engine.signals import FAIL from prefect.tasks.prefect import wait_for_flow_run, get_task_run_result @task def get_nrs_to_iterate_over(): return sample(range(100), 10) @task def plus_one(x): return x + 1 @task def fail_if_too_large_nr_or_add_two(x): if x >= 50: raise FAIL(message="Too large number") return x + 2 with Flow("independent-pipelines") as flow: nrs = get_nrs_to_iterate_over() new_nrs = fail_if_too_large_nr_or_add_two.map(nrs) final_nrs = plus_one.map(new_nrs)
Sylvain Hazard10/19/2021, 1:31 PM
which is completely expected and normal and having failed tasks in my flow if this happens would feel bad.
Sylvain Hazard10/19/2021, 1:34 PM
Kevin Kho10/19/2021, 2:01 PM
, but it looks like you are already familiar with raising SKIPPED also. I think in general, the only way to do this is to put the logic in the task itself, but that’s not cutting branches. It would be coercing the branches to SUCCESS
Sylvain Hazard10/19/2021, 2:07 PM