<@ULVA73B9P> I am using Prefect 2.16, I want to cr...
# ask-marvin
p
@Marvin I am using Prefect 2.16, I want to create a notification using Python code to send a slack message when a flow run stays in failed or crashed for 10 min. Can you create me the Python code plz?
m
To create a notification in Python that sends a Slack message when a flow run stays in a failed or crashed state for 10 minutes, you can utilize Prefect's state change hooks and Slack integration. Here's a sample code based on the information: 1. Ensure Prefect and Slack integration is set up in your environment:
Copy code
python
from prefect import flow, task
from prefect.blocks.core import Block
from prefect.engine.state import Failed
from prefect.tasks.slack.messages import send_chat_message
from prefect.settings import PREFECT_API_URL
from datetime import timedelta

# Define a function to send a Slack notification
def notify_slack(flow, flow_run, state):
    slack_webhook_block = Block.load("slack-webhook/my-slack-webhook")
    message = (
        f"Your job {flow_run.name} entered {state.name}.\n"
        f"See: https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}"
    )
    slack_webhook_block.notify(message)

@task
def some_task():
    # Simulate task workload
    pass

@flow(state_handlers=[notify_slack])
def my_flow():
    some_task()

if __name__ == "__main__":
    from prefect.client import get_client
    from prefect.server.schemas.filters import FlowRunFilter

    @flow
    async def monitor_task():
        async with get_client() as client:
            while True:
                await asyncio.sleep(600)  # Check every 10 minutes
                filter_params = FlowRunFilter(state={"name": ["Failed", "Crashed"]})
                flow_runs = await client.read_flow_runs(flow_filter=filter_params)
                for flow_run in flow_runs:
                    notify_slack(None, flow_run, flow_run.state)

    monitor_task()
    my_flow()
A summary of the steps: 1. Create a custom notification function (
notify_slack
). 2. Define your tasks and flows (
some_task
and
my_flow
). 3. Use state hooks and scheduled checks (
monitor_task
) to track state changes and send notifications. Links to helpful resources: - Slack integration: https://docs.prefect.io/latest/integrations/prefect-slack/messages/ - State Change Hooks: https://docs.prefect.io/latest/guides/state-change-hooks/ You can adjust the above sample code to suit your specific use case and deployment setup.
p
@Marvin Can I do it using Prefect automations?
m
Yes, you can utilize Prefect Automations to create a notification in Prefect Cloud. This can be achieved without the need for custom Python code. Here's how you can set up an automation for sending a Slack message when a flow run is in a failed or crashed state for 10 minutes: 1. Go to Automations Page: Navigate to the Automations page in the Prefect Cloud UI. 2. Create a New Automation: - Click on the "Create Automation" button. 3. Define the Trigger: - Set a trigger for when the flow run state changes to failed or crashed. - Add a filter condition to verify that the state has been in that status for 10 minutes. 4. Add an Action: - Choose the action "Send Notification". - Configure the Slack notification with the relevant details such as the Slack Webhook URL and the message content. For more information and detailed steps, refer to the Prefect documentation on automations: https://docs.prefect.io/latest/concepts/automations/ Automations in Prefect Cloud provide a straightforward mechanism to handle such notifications and allow you to leverage various built-in triggers and actions without writing custom code.
p
@Marvin I am using Prefect 2.20, I want to create a notification using Python code to send a slack message when a flow run stays in failed or crashed for 10 min. Can you create me the Python code using prefect automation plz?
m
To achieve sending a Slack message when a flow run stays in a failed or crashed state for 10 minutes using Prefect 2.20, you can use state change hooks in combination with asyncio for the delay. Here's a sample Python code: 1. Install necessary packages: Ensure you have the necessary packages installed:
Copy code
sh
pip install prefect prefect-slack
2. Define the flow and notification function:
Copy code
python
import asyncio
from datetime import timedelta
from prefect import flow, task, get_client
from prefect.blocks.core import Block
from prefect.engine.state import Failed, Crashed
from prefect.settings import PREFECT_API_URL

# Define a function to send Slack notification
def notify_slack(flow, flow_run, state):
    slack_webhook_block = Block.load("slack-webhook/my-slack-webhook")
    message = (
        f"Your job {flow_run.name} entered {state.name}.\n"
        f"See: https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}"
    )
    slack_webhook_block.notify(message)

@task
async def monitor_flow_run(flow_run_id):
    await asyncio.sleep(timedelta(minutes=10).total_seconds())
    async with get_client() as client:
        flow_run = await client.read_flow_run(flow_run_id)
        if flow_run.state.is_failed() or flow_run.state.is_crashed():
            notify_slack(None, flow_run, flow_run.state)

@task
def some_task():
    # Simulate task workload
    pass

@flow(log_prints=True)
def my_flow():
    flow_run = some_task.submit()
    monitor_flow_run.submit(flow_run.id)

if __name__ == "__main__":
    my_flow()
Explanation: 1. Install Prefect and Slack Integration: Ensure you have installed
prefect
and
prefect-slack
for Slack integrations. 2. Define the Notification Function (
notify_slack
)
: This function sends a message to Slack when called. It uses the
Block.load
method to load the Slack webhook configuration. 3. Define Monitoring Task (
monitor_flow_run
)
: This task waits for 10 minutes and then checks the state of the given flow run. If the flow run is still in a failed or crashed state, it calls the
notify_slack
function. 4. Define Your Flow and Main Function: This comprises the main flow
my_flow
, which includes a task (
some_task
) and schedules the monitoring task (
monitor_flow_run
). 5. Running the Flow: Finally, make sure the flow is executed if the script is run directly. Resources: - State Change Hooks: https://docs.prefect.io/latest/guides/state-change-hooks/ - Slack Integration: https://docs.prefect.io/latest/integrations/prefect-slack/messages/ This setup should help you get notifications when a flow run stays in a failed or crashed state for more than 10 minutes. Adjust the details accordingly to fit your exact needs.
p
@Marvin I am using Prefect 2.20, I want to create a notification using Python code to send a slack message when a flow run stays in failed or crashed for 10 min. Can you create me the Python code using prefect automation plz?