Sylvain Hazard
10/19/2021, 8:58 AMa = A()
b = B.map(a)
c = C.map(b)
and I would like to prevent C mapped tasks from being created if the upstream B mapped task is Failed.
I have tried using a FilterTask which works great but serves as a kind of reducing task which stops depth first execution from working as I would like it to. Is there another way to do this while still making use of DFE ?Anna Geller
10/19/2021, 1:29 PMprefect.triggers
). If one of mapped B child tasks fails, then by default, this would cause that the downstream C child tasks would immediately end with a TriggerFailed state, because the default trigger is all_successful
.
Here is an example flow that shows this:
from prefect import Flow, task
from random import sample
from prefect.engine.signals import FAIL
from prefect.tasks.prefect import wait_for_flow_run, get_task_run_result
@task
def get_nrs_to_iterate_over():
return sample(range(100), 10)
@task
def plus_one(x):
return x + 1
@task
def fail_if_too_large_nr_or_add_two(x):
if x >= 50:
raise FAIL(message="Too large number")
return x + 2
with Flow("independent-pipelines") as flow:
nrs = get_nrs_to_iterate_over()
new_nrs = fail_if_too_large_nr_or_add_two.map(nrs)
final_nrs = plus_one.map(new_nrs)
Here an example independent flow:Sylvain Hazard
10/19/2021, 1:31 PMNone
which is completely expected and normal and having failed tasks in my flow if this happens would feel bad.Kevin Kho
10/19/2021, 1:33 PMSylvain Hazard
10/19/2021, 1:34 PMKevin Kho
10/19/2021, 2:01 PMSignals
, but it looks like you are already familiar with raising SKIPPED also. I think in general, the only way to do this is to put the logic in the task itself, but that’s not cutting branches. It would be coercing the branches to SUCCESSSylvain Hazard
10/19/2021, 2:07 PM