Patricio Navarro
08/19/2024, 9:34 AMMarvin
08/19/2024, 9:35 AMpython
from prefect import flow, task
from prefect.blocks.core import Block
from prefect.engine.state import Failed
from prefect.tasks.slack.messages import send_chat_message
from prefect.settings import PREFECT_API_URL
from datetime import timedelta
# Define a function to send a Slack notification
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load("slack-webhook/my-slack-webhook")
message = (
f"Your job {flow_run.name} entered {state.name}.\n"
f"See: https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}"
)
slack_webhook_block.notify(message)
@task
def some_task():
# Simulate task workload
pass
@flow(state_handlers=[notify_slack])
def my_flow():
some_task()
if __name__ == "__main__":
from prefect.client import get_client
from prefect.server.schemas.filters import FlowRunFilter
@flow
async def monitor_task():
async with get_client() as client:
while True:
await asyncio.sleep(600) # Check every 10 minutes
filter_params = FlowRunFilter(state={"name": ["Failed", "Crashed"]})
flow_runs = await client.read_flow_runs(flow_filter=filter_params)
for flow_run in flow_runs:
notify_slack(None, flow_run, flow_run.state)
monitor_task()
my_flow()
A summary of the steps:
1. Create a custom notification function (notify_slack
).
2. Define your tasks and flows (some_task
and my_flow
).
3. Use state hooks and scheduled checks (monitor_task
) to track state changes and send notifications.
Links to helpful resources:
- Slack integration: https://docs.prefect.io/latest/integrations/prefect-slack/messages/
- State Change Hooks: https://docs.prefect.io/latest/guides/state-change-hooks/
You can adjust the above sample code to suit your specific use case and deployment setup.Patricio Navarro
08/19/2024, 9:36 AMMarvin
08/19/2024, 9:36 AMPatricio Navarro
08/19/2024, 9:40 AMMarvin
08/19/2024, 9:40 AMsh
pip install prefect prefect-slack
2. Define the flow and notification function:
python
import asyncio
from datetime import timedelta
from prefect import flow, task, get_client
from prefect.blocks.core import Block
from prefect.engine.state import Failed, Crashed
from prefect.settings import PREFECT_API_URL
# Define a function to send Slack notification
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load("slack-webhook/my-slack-webhook")
message = (
f"Your job {flow_run.name} entered {state.name}.\n"
f"See: https://{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run.id}"
)
slack_webhook_block.notify(message)
@task
async def monitor_flow_run(flow_run_id):
await asyncio.sleep(timedelta(minutes=10).total_seconds())
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
if flow_run.state.is_failed() or flow_run.state.is_crashed():
notify_slack(None, flow_run, flow_run.state)
@task
def some_task():
# Simulate task workload
pass
@flow(log_prints=True)
def my_flow():
flow_run = some_task.submit()
monitor_flow_run.submit(flow_run.id)
if __name__ == "__main__":
my_flow()
Explanation:
1. Install Prefect and Slack Integration:
Ensure you have installed prefect
and prefect-slack
for Slack integrations.
2. Define the Notification Function (notify_slack
):
This function sends a message to Slack when called. It uses the Block.load
method to load the Slack webhook configuration.
3. Define Monitoring Task (monitor_flow_run
):
This task waits for 10 minutes and then checks the state of the given flow run. If the flow run is still in a failed or crashed state, it calls the notify_slack
function.
4. Define Your Flow and Main Function:
This comprises the main flow my_flow
, which includes a task (some_task
) and schedules the monitoring task (monitor_flow_run
).
5. Running the Flow:
Finally, make sure the flow is executed if the script is run directly.
Resources:
- State Change Hooks: https://docs.prefect.io/latest/guides/state-change-hooks/
- Slack Integration: https://docs.prefect.io/latest/integrations/prefect-slack/messages/
This setup should help you get notifications when a flow run stays in a failed or crashed state for more than 10 minutes. Adjust the details accordingly to fit your exact needs.Patricio Navarro
08/19/2024, 9:41 AM