I’m trying to add two custom tasks to the end of m...
# prefect-community
m
I’m trying to add two custom tasks to the end of my flow– one that notifies that the flow finished successfully (
trigger=all_successful
), and one that only notifies if something failed (
trigger=any_failed
). I’ve added a custom state handler to trigger failed (see code block in thread) that should
SKIP
if
TriggerFailed
. Raising that
SKIP
is causing the Task to fail in Prefect Cloud. I previously tried to leverage the Flow
on_failure
but for some reason it wasn’t sending my notifications. Am I missing something re: raising a
SKIP
signal on the last Task? Is there something special happening in a Flow’s
on_failure
preventing it from sending a Slack notification?
Copy code
def skip_on_trigger_failed(task, old_state, new_state):
    """
    A Prefect State Handler that listens for a TriggerFailed state, then prevents the Task from failing, instead
    setting the state to Skipped. Used to skip Flow failure notifications if the Flow didn't fail.
    """
    if isinstance(new_state, state.TriggerFailed):
        raise signals.SKIP()
    return new_state
Copy code
class NotifyFailureTask(Task):
    """ A Prefect Task that notifies Slack if any tasks fail. Has a state handler to prevent it from failing if
    it is not triggered–it will not be triggered on successful runs.
    """

    def __init__(self):
        super().__init__(trigger=any_failed, state_handlers=[skip_on_trigger_failed])

    def run(self):
        notify_slack(title="failed :x:")
k
Hey @Marley, Do you need this notification at the task level? You can also set the state handler at the flow level to look for these final flow states - which will bubble from your task states.
m
hello! I don’t, I tried having it attached to
with Flow(…, on_failure=_notify_failure) as flow:
but it wasn’t sending Slack notifications with no explanations/errors from Prefect cloud. Looked good when testing from local. I used the function signature from https://docs.prefect.io/api/latest/core/flow.html#flow-2
k
Hmm, so if we want to attach the state handler at the flow level - we should definitely use the flow's
state_handlers
kwarg to attach the function and look out for changing state.
In case you haven't yet come across it, here's an example of responding to state at a task level, which we can augment for a flow level notification.
m
from source code:
Copy code
self.state_handlers = state_handlers or []
        if on_failure is not None:
            self.state_handlers.append(
                callback_factory(on_failure, check=lambda s: s.is_failed())
            )
k
Yeah, strange that the
on_failure
kwarg didn't pick that up - personally I usually opt for the
state_handlers
kwarg.
So even on a failed flow run the state handler won't trigger?
m
It seemed like it might be triggered, but it certainly didn’t run the notification function that is working in a number of other places. I’ll test it out with a traditional state_handler
k
Sure thing, it may also be worth trying out the built-in methods for
new_state
like this:
Copy code
def state_handler_for_flow(flow, old_state, new_state):
    if new_state.is_failed():
        msg = "My failed message"
        <http://requests.post|requests.post>("<https://XXXXX>", json={"text": msg})

    if new_state.is_successful():
        msg = "My success message"
        <http://requests.post|requests.post>("<https://XXXXX>", json={"text": msg})

    return new_state
m
as an update, the state handler is triggered correctly but something is suppressing slack client from being able to send notifications?