Hello ! Is there a way to filter Failed/Skipped/No...
# prefect-server
s
Hello ! Is there a way to filter Failed/Skipped/None mapped tasks in order to limit the number of useless downstream mapped tasks created. Say I have a flow that looks like this :
Copy code
a = A()
b = B.map(a)
c = C.map(b)
and I would like to prevent C mapped tasks from being created if the upstream B mapped task is Failed. I have tried using a FilterTask which works great but serves as a kind of reducing task which stops depth first execution from working as I would like it to. Is there another way to do this while still making use of DFE ?
a
@Sylvain Hazard you could influence that by attaching one of trigger rules (
prefect.triggers
). If one of mapped B child tasks fails, then by default, this would cause that the downstream C child tasks would immediately end with a TriggerFailed state, because the default trigger is
all_successful
. Here is an example flow that shows this:
Copy code
from prefect import Flow, task
from random import sample
from prefect.engine.signals import FAIL
from prefect.tasks.prefect import wait_for_flow_run, get_task_run_result

@task
def get_nrs_to_iterate_over():
    return sample(range(100), 10)


@task
def plus_one(x):
    return x + 1


@task
def fail_if_too_large_nr_or_add_two(x):
    if x >= 50:
        raise FAIL(message="Too large number")
    return x + 2


with Flow("independent-pipelines") as flow:
    nrs = get_nrs_to_iterate_over()
    new_nrs = fail_if_too_large_nr_or_add_two.map(nrs)
    final_nrs = plus_one.map(new_nrs)
Here an example independent flow:
s
Thanks for the answer ! The issue is I would like to be able to "cut" some branches without the task failing. In some cases, B tasks return
None
which is completely expected and normal and having failed tasks in my flow if this happens would feel bad.
k
Hey @Sylvain Hazard, there is a FilterTask that you can use.
s
Yeah, tried that. The issue is FilterTask acts as a reducing task and prevents depth first execution from running as I would like it to do. It's not that much of an issue but I'd prefer if there was another way.
k
Gotcha, I re-read the thread. I think Filtering out would need a reduce so that options are raising
Signals
, but it looks like you are already familiar with raising SKIPPED also. I think in general, the only way to do this is to put the logic in the task itself, but that’s not cutting branches. It would be coercing the branches to SUCCESS
s
Yeah, that's what I was doing before finding out about the FilterTask. It does work but you miss out on information when looking at the flow Schematic which is something pretty cool. I think I'll keep it simple and abandon DFE. It's pretty cool but not entirely necessary for our use case.
upvote 1