I had a flow with DAG like ```A -> B[0..N] -&g...
# ask-community
h
I had a flow with DAG like
Copy code
A -> B[0..N] -> C[0..N]
Sometimes some index N fails and then C never runs, because at least one of the upstream Mapped tasks failed. However, I'd always like to run C for the indices that worked; and the flow itself I want to be marked successful despite a few mappings failing. How could I configure this?
k
If B is an upstream of C, they should chain and a success in B1 and a failure in B2 will cause C1 to run and C2 to not. Is B an input to C? Did you define it as upstream? For the second part with the flow always succeeding, you can have a quick reduce step with Trigger
always_run
and then just return anything and when this succeeds, the flow will succeed. I think this is the easiest way. Or you can set a different reference task like A maybe.
h
I defined upstreams explicitly: this must be the reason then!
k
A bit later I think I can make an example for this if you still need help
h
What pattern do you use normally when you want to catch both errors and successes with this last task? I use it to track the model runs
Like try-catch-finally on a per-mapped-task basis
I also never want the flow as such to fail just because there's a "vertical mapping" that fails...
I would actually really appreciate a link to some docs or an example 🙂
k
Ok so a couple of code snippets
First, this one will show that if you have A1, A2, A3. If A2 fails, B2 will fail and C2 will fail. But B1 and C1, and B3 and C3 still succeed
Copy code
from prefect import Flow, task
import prefect

@task
def abc(x):
    prefect.context.get("logger").info("A"+str(x))
    return str(x)

@task
def bcd(x):
    if (str(x) == "2") or (str(x) == "3"):
        raise ValueError("Failed B" + str(x))
    prefect.context.get("logger").info("B"+str(x))
    return str(x)

@task
def cde(x):
    prefect.context.get("logger").info("C"+str(x))
    return str(x)

with Flow("example") as flow:
    items = [1,2,3,4,5]
    a = abc.map(items)
    b = bcd.map(a)
    c = cde.map(b)

flow.run()
I can then attach a trigger to
cde
like this:
Copy code
from prefect.triggers import always_run

@task(trigger=always_run)
def cde(x):
    prefect.context.get("logger").info("C"+str(x))
    return str(x)
and now
cde
will always run even if
bcd
fails
Now this one is a similar example where we have a mapped task with some failures and success. We can pipe it to another task to filter errors and success:
Copy code
from prefect import Flow, task
import prefect
from prefect.triggers import always_run

@task
def abc(x):
    prefect.context.get("logger").info("A"+str(x))
    return str(x)

@task
def bcd(x):
    if (str(x) == "2") or (str(x) == "3"):
        raise ValueError("Failed B" + str(x))
    prefect.context.get("logger").info("B"+str(x))
    return str(x)

@task(trigger=always_run)
def get_success(list_x):
    _ = [x for x in list_x if not isinstance(x, BaseException)]
    prefect.context.get("logger").info("LOGGING SUCCESSFUL RUNS")
    prefect.context.get("logger").info(_)
    return _

@task(trigger=always_run)
def get_failures(list_x):
    _ = [x for x in list_x if isinstance(x, BaseException)]
    prefect.context.get("logger").info("LOGGING FAILURE RUNS")
    prefect.context.get("logger").info(_)
    return _

with Flow("example") as flow:
    items = [1,2,3,4,5]
    a = abc.map(items)
    b = bcd.map(a)
    b_success = get_success(b)
    b_errors = get_failures(b)

flow.run()
This last code snippet will break the vertical mapping though with intermediate reduce steps so this is just to demo. For something real I think you can combine the concepts here to achieve what you want. Task
C
can have some failure logic if
B
failed and it received an
Exception
. We also have a FilterTask in the task library that will give you a faster interface to filter a list of task results. You can still use the same
isinstance(x, BaseException)
condition that I used.
I think these should be good starting points.
😀 1
🎉 1