I’m trying to add two custom tasks to the end of m...
# prefect-community
I’m trying to add two custom tasks to the end of my flow– one that notifies that the flow finished successfully (
), and one that only notifies if something failed (
). I’ve added a custom state handler to trigger failed (see code block in thread) that should
. Raising that
is causing the Task to fail in Prefect Cloud. I previously tried to leverage the Flow
but for some reason it wasn’t sending my notifications. Am I missing something re: raising a
signal on the last Task? Is there something special happening in a Flow’s
preventing it from sending a Slack notification?
Copy code
def skip_on_trigger_failed(task, old_state, new_state):
    A Prefect State Handler that listens for a TriggerFailed state, then prevents the Task from failing, instead
    setting the state to Skipped. Used to skip Flow failure notifications if the Flow didn't fail.
    if isinstance(new_state, state.TriggerFailed):
        raise signals.SKIP()
    return new_state
Copy code
class NotifyFailureTask(Task):
    """ A Prefect Task that notifies Slack if any tasks fail. Has a state handler to prevent it from failing if
    it is not triggered–it will not be triggered on successful runs.

    def __init__(self):
        super().__init__(trigger=any_failed, state_handlers=[skip_on_trigger_failed])

    def run(self):
        notify_slack(title="failed :x:")
Hey @Marley, Do you need this notification at the task level? You can also set the state handler at the flow level to look for these final flow states - which will bubble from your task states.
hello! I don’t, I tried having it attached to
with Flow(…, on_failure=_notify_failure) as flow:
but it wasn’t sending Slack notifications with no explanations/errors from Prefect cloud. Looked good when testing from local. I used the function signature from https://docs.prefect.io/api/latest/core/flow.html#flow-2
Hmm, so if we want to attach the state handler at the flow level - we should definitely use the flow's
kwarg to attach the function and look out for changing state.
In case you haven't yet come across it, here's an example of responding to state at a task level, which we can augment for a flow level notification.
from source code:
Copy code
self.state_handlers = state_handlers or []
        if on_failure is not None:
                callback_factory(on_failure, check=lambda s: s.is_failed())
Yeah, strange that the
kwarg didn't pick that up - personally I usually opt for the
So even on a failed flow run the state handler won't trigger?
It seemed like it might be triggered, but it certainly didn’t run the notification function that is working in a number of other places. I’ll test it out with a traditional state_handler
Sure thing, it may also be worth trying out the built-in methods for
like this:
Copy code
def state_handler_for_flow(flow, old_state, new_state):
    if new_state.is_failed():
        msg = "My failed message"
        <http://requests.post|requests.post>("<https://XXXXX>", json={"text": msg})

    if new_state.is_successful():
        msg = "My success message"
        <http://requests.post|requests.post>("<https://XXXXX>", json={"text": msg})

    return new_state
as an update, the state handler is triggered correctly but something is suppressing slack client from being able to send notifications?