sbrabez
05/17/2024, 10:57 PMFAILURE
and CRASHED
state?Marvin
05/17/2024, 10:57 PMFAILURE
and CRASHED
states, you can use the following methods depending on whether you are using Prefect Cloud or Prefect Core:
1. Prefect Cloud:
- Prefect Cloud provides an Automations interface for setting up notifications.
- To configure a notification, navigate to the Notifications page within your Prefect Cloud UI, and select "Create Notification" or click the "+" button.
- You can specify which run states (e.g., FAILURE
, CRASHED
) should trigger a notification, add tags to filter flow runs, and choose the method of notification such as email, Slack, Microsoft Teams, etc.
- For details on setting this up in Prefect Cloud refer to the Hosted Prefect documentation.
2. Prefect Core (State Change Hooks):
- You can create state change hooks that execute code when a flow run enters a specific state, such as FAILURE
.
- For example, here's how to set up a Slack notification using a state change hook:
python
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load("slack-webhook/my-slack-webhook")
slack_webhook_block.notify(
(f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
f"flow-run/{flow_run.id}|the flow>")
)
@flow(state_handlers=[notify_slack])
def my_flow():
# Flow definition goes here
pass
- The notify_slack
function will be called whenever the flow run state changes, and you can modify it to send notifications for FAILURE
and CRASHED
states.
- For more examples and information on how to use state change hooks in Prefect Core, check out the State Change Hooks documentation.
Remember, for email notifications in Prefect Cloud, you need to provide the email addresses for notifications, and for Slack notifications, you require webhook credentials for your workspace and the specific channel you want to notify.sbrabez
06/13/2024, 12:37 PMMarvin
06/13/2024, 12:39 PMpython
import requests
from prefect import flow
from prefect.blocks.core import Block
# Function to send notification to Opsgenie
def notify_opsgenie(flow, flow_run, state):
api_key = "your_opsgenie_api_key"
url = "<https://api.opsgenie.com/v2/alerts>"
headers = {
"Content-Type": "application/json",
"Authorization": f"GenieKey {api_key}"
}
payload = {
"message": f"Your flow {flow_run.name} entered {state.name} state",
"description": f"Flow {flow_run.name} failed. Check the flow run at: "
f"https://<your-prefect-instance>/flow-runs/{flow_run.id}",
"priority": "P1",
"source": "Prefect",
}
response = <http://requests.post|requests.post>(url, json=payload, headers=headers)
if response.status_code != 202:
print(f"Failed to send Opsgenie alert: {response.content}")
# Define your flow
@flow(state_handlers=[notify_opsgenie])
def my_flow():
# Flow definition goes here
pass
3. Use the flow with the state hook:
- The notify_opsgenie
function will be called whenever the flow run state changes. It sends an alert to Opsgenie when the state is FAILED
.
- Customize the payload
dictionary within the notification function to provide more details pertinent to your Opsgenie setup.
Links for reference:
- State Change Hooks in Prefect
- Opsgenie API documentation
This example demonstrates how to send alerts to Opsgenie for flow failures. Adjust the notify_opsgenie
function as needed to fit your specific requirements and deployment.sbrabez
06/13/2024, 1:01 PMMarvin
06/13/2024, 1:01 PMon_failure
state change handlers to send notifications to Opsgenie. Here is how you can set up notifications for flow failures via Opsgenie using the on_failure
handler:
1. Ensure you have your Opsgenie API Key: This key will be used to authenticate your alert requests.
2. Create State Change Hook for on_failure
:
- You can define a state change hook in your flow to send an alert to Opsgenie when the flow fails.
Here's an example:
python
import requests
from prefect import flow
from prefect.blocks.core import Block
# Function to send notification to Opsgenie
def notify_opsgenie(flow, flow_run, state):
api_key = "your_opsgenie_api_key"
url = "<https://api.opsgenie.com/v2/alerts>"
headers = {
"Content-Type": "application/json",
"Authorization": f"GenieKey {api_key}"
}
payload = {
"message": f"Flow {flow_run.name} entered {state.name} state",
"description": f"Flow {flow_run.name} failed. "
f"Check details at: https://<your-prefect-instance>/flow-runs/{flow_run.id}",
"priority": "P1", # You can set the priority as needed
"source": "Prefect",
}
response = <http://requests.post|requests.post>(url, json=payload, headers=headers)
if response.status_code != 202:
print(f"Failed to send Opsgenie alert: {response.content}")
# Define your flow with the on_failure state handler
@flow(on_failure=[notify_opsgenie])
def my_flow():
# Flow logic
# Simulate a failure
raise Exception("Simulated failure for testing Opsgenie notification")
# Run the flow
my_flow()
Explanation:
1. notify_opsgenie
Function:
- This method constructs a payload with the necessary details about the failure and sends a POST request to Opsgenie's API endpoint with the appropriate headers.
2. Decorating the Flow with on_failure
:
- The @flow(on_failure=[notify_opsgenie])
decorator ensures that the notify_opsgenie
function is triggered whenever the flow enters a failure state.
3. Running the Flow:
- When the flow is run and it fails, the notify_opsgenie
function is called, sending an alert to Opsgenie.
Make sure to test the setup and validate that alerts are being correctly sent to Opsgenie. You can customize the payload
dictionary as per your notification requirements.
Refs:
- State Change Hooks in Prefect
- Opsgenie API Documentation
This setup should help you get started with using Prefect's on_failure
state handler to send notifications to Opsgenie for flow failures.sbrabez
06/25/2024, 4:07 PMMarvin
06/25/2024, 4:08 PMflow_run_id
using the Prefect client in Python, you can use the PrefectClient
which provides various methods to access flow run details. Here’s a step-by-step example:
1. Install Prefect if you haven't already:
sh
pip install prefect
2. Use the Prefect client to retrieve flow run details:
- The PrefectClient
can be used to interact with the Prefect API to get details about a flow run using its flow_run_id
.
Here's a sample script to demonstrate this:
python
import asyncio
from prefect import get_client
async def get_flow_run_data(flow_run_id):
async with get_client() as client:
# Retrieve the flow run details
flow_run = await client.read_flow_run(flow_run_id)
return flow_run
# Replace with your actual flow_run_id
flow_run_id = "your-flow-run-id"
# Run the async function to get the flow run data
flow_run_data = asyncio.run(get_flow_run_data(flow_run_id))
print(flow_run_data.json())
Explanation:
- Async Context Manager: The PrefectClient
is used as an async context manager to ensure proper resource management.
- read_flow_run Method: This method retrieves the details of the specified flow run.
- asyncio.run: This is used to run the async function in a synchronous environment.
References:
- Using the Prefect Client Documentation
This example should help you get started with retrieving the JSON data of a flow run using the Prefect client. Adjust the flow_run_id
to match the ID of the flow run you want to query.sbrabez
06/25/2024, 4:09 PMstate
, url
, start_time
sbrabez
06/25/2024, 4:10 PMstate
, url
, start_time
for a flow_run_id in inputMarvin
06/25/2024, 4:10 PMstate
, url
, and start_time
from the JSON data returned by the PrefectClient
.
Here's the modified version that includes a function `get_flow_run_details`:
python
import asyncio
from prefect import get_client
async def get_flow_run_data(flow_run_id):
async with get_client() as client:
# Retrieve the flow run details
flow_run = await client.read_flow_run(flow_run_id)
return flow_run
def extract_flow_run_details(flow_run_data):
# Extract the desired details
state = flow_run_data.state.name
url = f"https://{flow_run_data.infrastructure_document_id}/flow-runs/flow-run/{flow_run_data.id}"
start_time = flow_run_data.start_time.isoformat() if flow_run_data.start_time else None
return {"state": state, "url": url, "start_time": start_time}
async def main(flow_run_id):
flow_run_data = await get_flow_run_data(flow_run_id)
flow_run_details = extract_flow_run_details(flow_run_data)
return flow_run_details
# Replace with your actual flow_run_id
flow_run_id = "your-flow-run-id"
# Run the async function to get the flow run details
flow_run_details = asyncio.run(main(flow_run_id))
print(flow_run_details)
Explanation:
1. Extract Function:
- The extract_flow_run_details
function takes the flow run data and extracts the required state
, url
, and start_time
.
- The state
is obtained directly from the state
attribute.
- The url
is constructed using the flow_run_data
information. Replace "https://{flow_run_data.infrastructure_document_id}/flow-runs/flow-run/"
with the correct base URL of your Prefect instance.
- The start_time
is converted to ISO format if it exists.
2. Main Function:
- The main
function calls the get_flow_run_data
to fetch the flow run data and then extracts the details using extract_flow_run_details
.
3. Running the Code:
- The main
function is run asynchronously to get and print the flow run details.
This script completes the required functionality to get the state
, url
, and start_time
from the flow run data based on the given flow_run_id
. Make sure to replace "your-flow-run-id"
with the actual flow run ID you're working with.