Thread
#prefect-community
    Ken Nguyen

    Ken Nguyen

    1 year ago
    Hi! I'm currently setting up Slack notifications and would love to get some advice. For context, I have set up 2 slack channels, 1 for notifying flow runs, and 1 for notifying flow fails (if any). I've set up my state handler this way:
    def post_to_slack(obj, old_state, new_state):
        # Notify failures
        if new_state.is_failed():
            msg = "Flow/Task {0} finished in state {1}".format(obj, new_state)
            secret_slack = fail_channel_webhook
    
            <http://requests.post|requests.post>(secret_slack, json={"text": msg})
    
        # Notify runs
        if new_state.is_running():
            msg = "Flow/Task {0} is running".format(obj)
            secret_slack = running_channel_webhook
    
            <http://requests.post|requests.post>(secret_slack, json={"text": msg})
    
        return new_state
    When I tested it on tasks, I was able to receive notifications for task running, and then task failing. However, when I tested it on flows, it only notified me of flow running, but not failing despite the flow's end result being
    Flow run FAILED: some reference tasks failed.
    What differs between the flow and the task for this to not have worked? Thanks in advance for your suggestions!
    Kevin Kho

    Kevin Kho

    1 year ago
    This sounds like it should work. Will give it a test in a bit
    Ken Nguyen

    Ken Nguyen

    1 year ago
    For some more context, the log for the task failure that was reported is:
    Additionally, I attached the same state_handler for both flow and tasks in the same flow file.
    Kevin Kho

    Kevin Kho

    1 year ago
    Can you try this? I am seeing a log for both my task failure and flow failure:
    from prefect import Flow, task
    import prefect
    
    def post_to_slack(obj, old_state, new_state):
        # Notify failures
        logger = prefect.context.get("logger")
        if new_state.is_failed():
            <http://logger.info|logger.info>("This failed")
    
        # Notify runs
        if new_state.is_running():
            <http://logger.info|logger.info>("This is running")
    
        return new_state
    
    
    @task(state_handlers = [post_to_slack])
    def abc():
        raise ValueError()
        return 1
    
    with Flow("test", state_handlers=[post_to_slack]) as flow:
        abc()
    
    flow.run()
    Was your error something that quite literally killed the underlying hardware? (Out of memory)?
    What type of agent are you using also?
    Ken Nguyen

    Ken Nguyen

    1 year ago
    My apologies, I have just tested it again and flow fail alerts now work. I believe I pushed the changes to github but didn't re-register the flow between development versions!! So sorry!!
    But I do have a follow up question for flow failures, is it possible for me to get the run id, or even better, the URL of the run? I'd love to be able to alert the fail and then attach the link to the run in the failure slack message.
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh no worries, it was a quick test cuz of the code you gave! Use the context in the state handler.
    prefect.context.get("flow_run_id")
    You can construct the URL with the stuff from context: https://docs.prefect.io/api/latest/utilities/context.html
    Ken Nguyen

    Ken Nguyen

    1 year ago
    Prefect is so amazing
    Kevin Kho

    Kevin Kho

    1 year ago
    Thank you!
    btw you can use the Prefect SlackTask inside the state handler too by calling the
    run
    method.