m

    Marley

    1 year ago
    I’m trying to add two custom tasks to the end of my flow– one that notifies that the flow finished successfully (
    trigger=all_successful
    ), and one that only notifies if something failed (
    trigger=any_failed
    ). I’ve added a custom state handler to trigger failed (see code block in thread) that should
    SKIP
    if
    TriggerFailed
    . Raising that
    SKIP
    is causing the Task to fail in Prefect Cloud. I previously tried to leverage the Flow
    on_failure
    but for some reason it wasn’t sending my notifications. Am I missing something re: raising a
    SKIP
    signal on the last Task? Is there something special happening in a Flow’s
    on_failure
    preventing it from sending a Slack notification?
    def skip_on_trigger_failed(task, old_state, new_state):
        """
        A Prefect State Handler that listens for a TriggerFailed state, then prevents the Task from failing, instead
        setting the state to Skipped. Used to skip Flow failure notifications if the Flow didn't fail.
        """
        if isinstance(new_state, state.TriggerFailed):
            raise signals.SKIP()
        return new_state
    class NotifyFailureTask(Task):
        """ A Prefect Task that notifies Slack if any tasks fail. Has a state handler to prevent it from failing if
        it is not triggered–it will not be triggered on successful runs.
        """
    
        def __init__(self):
            super().__init__(trigger=any_failed, state_handlers=[skip_on_trigger_failed])
    
        def run(self):
            notify_slack(title="failed :x:")
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Hey @Marley, Do you need this notification at the task level? You can also set the state handler at the flow level to look for these final flow states - which will bubble from your task states.
    m

    Marley

    1 year ago
    hello! I don’t, I tried having it attached to
    with Flow(…, on_failure=_notify_failure) as flow:
    but it wasn’t sending Slack notifications with no explanations/errors from Prefect cloud. Looked good when testing from local. I used the function signature from https://docs.prefect.io/api/latest/core/flow.html#flow-2
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Hmm, so if we want to attach the state handler at the flow level - we should definitely use the flow's
    state_handlers
    kwarg to attach the function and look out for changing state.
    In case you haven't yet come across it, here's an example of responding to state at a task level, which we can augment for a flow level notification.
    m

    Marley

    1 year ago
    from source code:
    self.state_handlers = state_handlers or []
            if on_failure is not None:
                self.state_handlers.append(
                    callback_factory(on_failure, check=lambda s: s.is_failed())
                )
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Yeah, strange that the
    on_failure
    kwarg didn't pick that up - personally I usually opt for the
    state_handlers
    kwarg.
    So even on a failed flow run the state handler won't trigger?
    m

    Marley

    1 year ago
    It seemed like it might be triggered, but it certainly didn’t run the notification function that is working in a number of other places. I’ll test it out with a traditional state_handler
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Sure thing, it may also be worth trying out the built-in methods for
    new_state
    like this:
    def state_handler_for_flow(flow, old_state, new_state):
        if new_state.is_failed():
            msg = "My failed message"
            <http://requests.post|requests.post>("<https://XXXXX>", json={"text": msg})
    
        if new_state.is_successful():
            msg = "My success message"
            <http://requests.post|requests.post>("<https://XXXXX>", json={"text": msg})
    
        return new_state
    m

    Marley

    1 year ago
    as an update, the state handler is triggered correctly but something is suppressing slack client from being able to send notifications?