Marley
10/26/2020, 4:33 PMtrigger=all_successful
), and one that only notifies if something failed (trigger=any_failed
). I’ve added a custom state handler to trigger failed (see code block in thread) that should SKIP
if TriggerFailed
. Raising that SKIP
is causing the Task to fail in Prefect Cloud. I previously tried to leverage the Flow on_failure
but for some reason it wasn’t sending my notifications. Am I missing something re: raising a SKIP
signal on the last Task? Is there something special happening in a Flow’s on_failure
preventing it from sending a Slack notification?def skip_on_trigger_failed(task, old_state, new_state):
"""
A Prefect State Handler that listens for a TriggerFailed state, then prevents the Task from failing, instead
setting the state to Skipped. Used to skip Flow failure notifications if the Flow didn't fail.
"""
if isinstance(new_state, state.TriggerFailed):
raise signals.SKIP()
return new_state
class NotifyFailureTask(Task):
""" A Prefect Task that notifies Slack if any tasks fail. Has a state handler to prevent it from failing if
it is not triggered–it will not be triggered on successful runs.
"""
def __init__(self):
super().__init__(trigger=any_failed, state_handlers=[skip_on_trigger_failed])
def run(self):
notify_slack(title="failed :x:")
Kyle Moon-Wright
10/26/2020, 5:08 PMMarley
10/26/2020, 5:11 PMwith Flow(…, on_failure=_notify_failure) as flow:
but it wasn’t sending Slack notifications with no explanations/errors from Prefect cloud. Looked good when testing from local. I used the function signature from https://docs.prefect.io/api/latest/core/flow.html#flow-2Kyle Moon-Wright
10/26/2020, 5:15 PMstate_handlers
kwarg to attach the function and look out for changing state.Marley
10/26/2020, 5:19 PMself.state_handlers = state_handlers or []
if on_failure is not None:
self.state_handlers.append(
callback_factory(on_failure, check=lambda s: s.is_failed())
)
Kyle Moon-Wright
10/26/2020, 5:22 PMon_failure
kwarg didn't pick that up - personally I usually opt for the state_handlers
kwarg.Marley
10/26/2020, 5:25 PMKyle Moon-Wright
10/26/2020, 5:34 PMnew_state
like this:
def state_handler_for_flow(flow, old_state, new_state):
if new_state.is_failed():
msg = "My failed message"
<http://requests.post|requests.post>("<https://XXXXX>", json={"text": msg})
if new_state.is_successful():
msg = "My success message"
<http://requests.post|requests.post>("<https://XXXXX>", json={"text": msg})
return new_state
Marley
11/04/2020, 1:56 AM