Thread
#prefect-community
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    Hi, I have added state_handlers to my flow, but for some reason they are not getting triggered. Currently it looks like this:
    from prefect.utilities.notifications import slack_notifier
    .
    .
    flow.state_handlers = [slack_notifier()]
    I have added the prefect app and also added the webhook URL to the
    SLACK_WEBHOOK_URL
    secret. Any pointers on why this is happening / how to debug?
    Anna Geller

    Anna Geller

    6 months ago
    For Slack specifically, we now have this step-by-step walkthrough https://discourse.prefect.io/t/how-to-send-slack-notifications-using-the-slacktask/497 In general, there are two types of state handlers: • task level • flow level Currently, you attached a flow-level state handler, which fires only if the reference tasks fail. By default, the last task of the flow is the reference task but you can modify that using:
    flow.set_reference_tasks([task1, task2])
    If you want the Slack alert to fire on a specific task's failure, then attach it to a task rather than flow. Lastly as described in the link, it's more flexible to use
    SlackTask
    than
    prefect.utilities.notifications.slack_notifier
    If you want to use
    slack_notifier
    (or any other state handler function), you don't call it, but rather pass it as callable, so instead of
    from prefect.utilities.notifications import slack_notifier
    
    flow.state_handlers = [slack_notifier()]
    Try
    [slack_notifier]
    (no round brackets):
    from prefect.utilities.notifications import slack_notifier
    
    flow.state_handlers = [slack_notifier]
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    Hey @Anna Geller, thanks for the explanation. It makes sense to not call the
    slack_notifier
    and pass it as a callable, but then how would I add parameters like
    ignore_states=[state.Running]
    ?
    @Anna Geller Alongside
    slack_notifier
    , I also wanted to use
    jira_notifier
    to create tickets for failed flows, and I need to provide
    only_states
    and
    options
    arguments to it.
    Anna Geller

    Anna Geller

    6 months ago
    you can do:
    from prefect import task
    from prefect.engine.state import Running
    from prefect.utilities.notifications import slack_notifier
    
    handler = slack_notifier(only_states=[Running]) 
    
    @task(state_handlers=[handler])
    def add(x, y):
        return x + y
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    Hey @Anna Geller, isn't this very similar to what I was doing earlier? I was doing this:
    flow.state_handlers = [slack_notifier()]
    What you are suggesting, when used imperatively for a flow, becomes:
    flow.state_handlers = [slack_notifier(only_states=[Running])]
    Wouldn't this also result in the
    slack_notifier
    function being called, instead of being passed as a callable?
    Hey @Anna Geller, just tried
    flow.state_handlers = [slack_notifier]
    Still didn't work. Any ideas why?
    Anna Geller

    Anna Geller

    6 months ago
    Can you share a simple hello world flow that failed for you? I can then try to reproduce and debug
    and if this doesn't work for you, try using
    SlackTask
    instead which may be much easier and customizable e.g.:
    import prefect
    from prefect import task, Flow
    from prefect.tasks.notifications import SlackTask
    from typing import cast
    
    
    def post_to_slack_on_failure(task, old_state, new_state):
        if new_state.is_failed():
            if isinstance(new_state.result, Exception):
                value = "```{}```".format(repr(new_state.result))
            else:
                value = cast(str, new_state.message)
            msg = (
                f"The task `{prefect.context.task_name}` failed "
                f"in a flow run {prefect.context.flow_run_id} "
                f"with an exception {value}"
            )
            SlackTask(message=msg).run()
        return new_state
    
    
    @task(state_handlers=[post_to_slack_on_failure])
    def divide_numbers(a, b):
        return 1 / (b - a)
    
    
    with Flow(name="flow_failing_with_bad_parameters") as flow:
        result = divide_numbers(1, 1)
    
    
    if __name__ == "__main__":
        flow.run()
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    I'd rather not add another task to the DAG just for logging. Also, Is there a Jira equivalent of SlackTask?
    Oh, I see what you mean - adding Slack task as a state handler and not as a task. Makes sense.
    Kevin Kho

    Kevin Kho

    6 months ago
    I wouldn’t call them equivalent, but there is! JiraTask docs. If you use this then you really have to make your own state handler. The slack_notifier is not recommended because you can’t customize the message
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    I'm okay with non-customizable messages if it means writing less code. Any idea how I can get
    slack_notifier
    and
    jira_notifier
    to work?
    Kevin Kho

    Kevin Kho

    6 months ago
    There is no jira_notifier. Can you an example of what you tried? We can’t help if you don’t give an example of what you are doing.
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    I was trying exactly this : https://docs.prefect.io/api/latest/utilities/notifications.html#functions (The example at the bottom of the page)
    Anna Geller

    Anna Geller

    6 months ago
    also, are you on Prefect Cloud? if so, you could check our Automations which can handle failure notifications for you and can be configured from the UI
    Kevin Kho

    Kevin Kho

    6 months ago
    Ohh news to me there is a Jira notifier! My bad. So you tried the Jira notifier and not the slack one?
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    Automations is a good idea, but we might move to the self hosted version soon if it fits the bill.
    I tried both, this is exactly the code I was using:
    flow.state_handlers = [
                slack_notifier(ignore_states=[state.Running]),
                jira_notifier(
                    only_states=[state.Failed],
                    options={
                        'project': 'XXXXXXX', 
                        'issuetype': {'name': 'Task'},
                        'description': f'Flow Failed : {project_name}.{flow_name}'},
                )
            ]
    Kevin Kho

    Kevin Kho

    6 months ago
    Will look more in a bit but it doesn’t get called because it’s curried
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    I have already created a
    SLACK_WEBHOOK_URL
    secret containing the slack webhook URL (I know this is okay because this works with Automations)
    I also have a
    JIRASECRETS
    JSON secret with the 3 required keys.
    Kevin Kho

    Kevin Kho

    6 months ago
    Let make try real quick with the notifier and then paste my working code snippet here
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    Thanks! That would be awesome!
    Not sure if this is relevant, but I'm trying to set the state_handlers for Flows and not tasks. Although I think it works the same way for both.
    Kevin Kho

    Kevin Kho

    6 months ago
    This works for me:
    from prefect import Flow, task
    import prefect
    from prefect.utilities.notifications import slack_notifier
    
    @task(state_handlers=[slack_notifier])
    def test(x):
        <http://prefect.context.logger.info|prefect.context.logger.info>(f"test-{x}")
        return x+1
    
    with Flow("processes") as flow:
        test(1)
    
    flow.run()
    What error do you get?
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    I'm not getting any errors, it just doesn't send any messages.
    Kevin Kho

    Kevin Kho

    6 months ago
    Why don’t you use my webhook and change the message so we can know if it’s your webhook or the way you set it
    [context.secrets]
    SLACK_WEBHOOK_URL = "<https://hooks.slack.com/services/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>"
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    Alright, let me try.
    Hey @Kevin Kho, it's working perfectly when I do a local flow.run() But quick runs from the cloud UI are not sending any messages.
    Kevin Kho

    Kevin Kho

    6 months ago
    Wait, did you use my key or it worked for you own? Any log from the cloud UI? Did you put a secret on Prefect Cloud?
    Madhup Sukoon

    Madhup Sukoon

    6 months ago
    I used my own key, and the secret is there on Cloud.
    Kevin Kho

    Kevin Kho

    6 months ago
    ^ this is solved. idempotency was preventing re-registration