Hi, I have added state_handlers to my flow, but fo...
# prefect-community
m
Hi, I have added state_handlers to my flow, but for some reason they are not getting triggered. Currently it looks like this:
Copy code
from prefect.utilities.notifications import slack_notifier
.
.
flow.state_handlers = [slack_notifier()]
I have added the prefect app and also added the webhook URL to the
SLACK_WEBHOOK_URL
secret. Any pointers on why this is happening / how to debug?
a
For Slack specifically, we now have this step-by-step walkthrough https://discourse.prefect.io/t/how-to-send-slack-notifications-using-the-slacktask/497 In general, there are two types of state handlers: • task level • flow level Currently, you attached a flow-level state handler, which fires only if the reference tasks fail. By default, the last task of the flow is the reference task but you can modify that using:
Copy code
flow.set_reference_tasks([task1, task2])
If you want the Slack alert to fire on a specific task's failure, then attach it to a task rather than flow. Lastly as described in the link, it's more flexible to use
SlackTask
than
Copy code
prefect.utilities.notifications.slack_notifier
If you want to use
slack_notifier
(or any other state handler function), you don't call it, but rather pass it as callable, so instead of
Copy code
from prefect.utilities.notifications import slack_notifier

flow.state_handlers = [slack_notifier()]
Try
[slack_notifier]
(no round brackets):
Copy code
from prefect.utilities.notifications import slack_notifier

flow.state_handlers = [slack_notifier]
m
Hey @Anna Geller, thanks for the explanation. It makes sense to not call the
slack_notifier
and pass it as a callable, but then how would I add parameters like
ignore_states=[state.Running]
?
@Anna Geller Alongside
slack_notifier
, I also wanted to use
jira_notifier
to create tickets for failed flows, and I need to provide
only_states
and
options
arguments to it.
a
you can do:
Copy code
from prefect import task
from prefect.engine.state import Running
from prefect.utilities.notifications import slack_notifier

handler = slack_notifier(only_states=[Running]) 

@task(state_handlers=[handler])
def add(x, y):
    return x + y
m
Hey @Anna Geller, isn't this very similar to what I was doing earlier? I was doing this:
Copy code
flow.state_handlers = [slack_notifier()]
What you are suggesting, when used imperatively for a flow, becomes:
Copy code
flow.state_handlers = [slack_notifier(only_states=[Running])]
Wouldn't this also result in the
slack_notifier
function being called, instead of being passed as a callable?
Hey @Anna Geller, just tried
Copy code
flow.state_handlers = [slack_notifier]
Still didn't work. Any ideas why?
a
Can you share a simple hello world flow that failed for you? I can then try to reproduce and debug
and if this doesn't work for you, try using
SlackTask
instead which may be much easier and customizable e.g.:
Copy code
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast


def post_to_slack_on_failure(task, old_state, new_state):
    if new_state.is_failed():
        if isinstance(new_state.result, Exception):
            value = "```{}```".format(repr(new_state.result))
        else:
            value = cast(str, new_state.message)
        msg = (
            f"The task `{prefect.context.task_name}` failed "
            f"in a flow run {prefect.context.flow_run_id} "
            f"with an exception {value}"
        )
        SlackTask(message=msg).run()
    return new_state


@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
    return 1 / (b - a)


with Flow(name="flow_failing_with_bad_parameters") as flow:
    result = divide_numbers(1, 1)


if __name__ == "__main__":
    flow.run()
m
I'd rather not add another task to the DAG just for logging. Also, Is there a Jira equivalent of SlackTask?
Oh, I see what you mean - adding Slack task as a state handler and not as a task. Makes sense.
👍 1
k
I wouldn’t call them equivalent, but there is! JiraTask docs. If you use this then you really have to make your own state handler. The slack_notifier is not recommended because you can’t customize the message
upvote 1
m
I'm okay with non-customizable messages if it means writing less code. Any idea how I can get
slack_notifier
and
jira_notifier
to work?
k
There is no jira_notifier. Can you an example of what you tried? We can’t help if you don’t give an example of what you are doing.
m
I was trying exactly this : https://docs.prefect.io/api/latest/utilities/notifications.html#functions (The example at the bottom of the page)
a
also, are you on Prefect Cloud? if so, you could check our Automations which can handle failure notifications for you and can be configured from the UI
k
Ohh news to me there is a Jira notifier! My bad. So you tried the Jira notifier and not the slack one?
m
Automations is a good idea, but we might move to the self hosted version soon if it fits the bill.
I tried both, this is exactly the code I was using:
Copy code
flow.state_handlers = [
            slack_notifier(ignore_states=[state.Running]),
            jira_notifier(
                only_states=[state.Failed],
                options={
                    'project': 'XXXXXXX', 
                    'issuetype': {'name': 'Task'},
                    'description': f'Flow Failed : {project_name}.{flow_name}'},
            )
        ]
k
Will look more in a bit but it doesn’t get called because it’s curried
🙏 1
m
I have already created a
SLACK_WEBHOOK_URL
secret containing the slack webhook URL (I know this is okay because this works with Automations)
I also have a
JIRASECRETS
JSON secret with the 3 required keys.
k
Let make try real quick with the notifier and then paste my working code snippet here
m
Thanks! That would be awesome!
Not sure if this is relevant, but I'm trying to set the state_handlers for Flows and not tasks. Although I think it works the same way for both.
k
This works for me:
Copy code
from prefect import Flow, task
import prefect
from prefect.utilities.notifications import slack_notifier

@task(state_handlers=[slack_notifier])
def test(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"test-{x}")
    return x+1

with Flow("processes") as flow:
    test(1)

flow.run()
What error do you get?
m
I'm not getting any errors, it just doesn't send any messages.
k
Why don’t you use my webhook and change the message so we can know if it’s your webhook or the way you set it
Copy code
[context.secrets]
SLACK_WEBHOOK_URL = "<https://hooks.slack.com/services/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>"
m
Alright, let me try.
Hey @Kevin Kho, it's working perfectly when I do a local flow.run() But quick runs from the cloud UI are not sending any messages.
k
Wait, did you use my key or it worked for you own? Any log from the cloud UI? Did you put a secret on Prefect Cloud?
m
I used my own key, and the secret is there on Cloud.
k
^ this is solved. idempotency was preventing re-registration