haf
10/30/2021, 4:07 PMA -> B[0..N] -> C[0..N]
Sometimes some index N fails and then C never runs, because at least one of the upstream Mapped tasks failed.
However, I'd always like to run C for the indices that worked; and the flow itself I want to be marked successful despite a few mappings failing.
How could I configure this?Kevin Kho
always_run
and then just return anything and when this succeeds, the flow will succeed. I think this is the easiest way. Or you can set a different reference task like A maybe.haf
10/30/2021, 4:42 PMKevin Kho
haf
10/30/2021, 4:59 PMhaf
10/30/2021, 5:06 PMhaf
10/30/2021, 6:24 PMhaf
10/30/2021, 6:25 PMKevin Kho
Kevin Kho
from prefect import Flow, task
import prefect
@task
def abc(x):
prefect.context.get("logger").info("A"+str(x))
return str(x)
@task
def bcd(x):
if (str(x) == "2") or (str(x) == "3"):
raise ValueError("Failed B" + str(x))
prefect.context.get("logger").info("B"+str(x))
return str(x)
@task
def cde(x):
prefect.context.get("logger").info("C"+str(x))
return str(x)
with Flow("example") as flow:
items = [1,2,3,4,5]
a = abc.map(items)
b = bcd.map(a)
c = cde.map(b)
flow.run()
Kevin Kho
cde
like this:
from prefect.triggers import always_run
@task(trigger=always_run)
def cde(x):
prefect.context.get("logger").info("C"+str(x))
return str(x)
and now cde
will always run even if bcd
failsKevin Kho
from prefect import Flow, task
import prefect
from prefect.triggers import always_run
@task
def abc(x):
prefect.context.get("logger").info("A"+str(x))
return str(x)
@task
def bcd(x):
if (str(x) == "2") or (str(x) == "3"):
raise ValueError("Failed B" + str(x))
prefect.context.get("logger").info("B"+str(x))
return str(x)
@task(trigger=always_run)
def get_success(list_x):
_ = [x for x in list_x if not isinstance(x, BaseException)]
prefect.context.get("logger").info("LOGGING SUCCESSFUL RUNS")
prefect.context.get("logger").info(_)
return _
@task(trigger=always_run)
def get_failures(list_x):
_ = [x for x in list_x if isinstance(x, BaseException)]
prefect.context.get("logger").info("LOGGING FAILURE RUNS")
prefect.context.get("logger").info(_)
return _
with Flow("example") as flow:
items = [1,2,3,4,5]
a = abc.map(items)
b = bcd.map(a)
b_success = get_success(b)
b_errors = get_failures(b)
flow.run()
Kevin Kho
C
can have some failure logic if B
failed and it received an Exception
. We also have a FilterTask in the task library that will give you a faster interface to filter a list of task results. You can still use the same isinstance(x, BaseException)
condition that I used.Kevin Kho