Ken Nguyen
06/13/2022, 11:15 PMa -> b ->c
I want it so that if a
and b
fails, it sends a Slack notifications saying “Flow failed”. But if c
fails, I want it alter the Slack message by adding b’s outputKevin Kho
b
as an input and c
as an upstream with trigger any_failed
Ken Nguyen
06/13/2022, 11:28 PMKevin Kho
@task(state_handers=[...])
Ken Nguyen
06/13/2022, 11:43 PMKevin Kho
Ken Nguyen
06/14/2022, 12:56 AMKevin Kho
Ken Nguyen
06/14/2022, 12:57 AMKevin Kho
from prefect import Flow, task
import prefect
class SomeError(ValueError):
def __init__(self, *args, x):
super().__init__(*args)
self.x = x
def st(task, old_state, new_state):
if new_state.is_failed():
<http://prefect.context.logger.info|prefect.context.logger.info>(type(new_state.result))
<http://prefect.context.logger.info|prefect.context.logger.info>(new_state.result.x)
return new_state
@task(state_handlers=[st])
def abc(x):
# we want to keep x in state handler
if x == 2:
raise SomeError("test error", x=x)
return x+1
with Flow("..") as flow:
abc(2)
flow.run()
Ken Nguyen
06/14/2022, 1:07 AMKevin Kho
Ken Nguyen
06/14/2022, 1:38 AMif len(str) > 0:
raise FAIL
return str
But it seems like when I raise FAIL, the task returns None
insteadKevin Kho
SKIP
propagates to downstream (and FAIL does too so the downstream shouldn’t run)Ken Nguyen
06/14/2022, 1:43 AMraise SKIP
instead?Kevin Kho
Ken Nguyen
06/14/2022, 1:53 AMKevin Kho
Ken Nguyen
06/14/2022, 2:21 AM