João Moreira
07/31/2024, 9:07 AMMarvin
07/31/2024, 9:08 AMIf a run of any flow with daily-etl tag enters a failed state, send a notification to my-slack-webhook
When triggered, you will receive a message like:
The fuzzy-leopard run of the daily-etl flow entered a failed state at 22-06-27 16:21:37 EST
You can find more details on the configuration here:
https://docs.prefect.io/latest/guides/host/
If you need further assistance or examples of tasks and flows, you can check this code example:
https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/tasks/basic.pyJoão Moreira
07/31/2024, 9:12 AMMarvin
07/31/2024, 9:15 AMpython
import prefect
from prefect.client.orion import get_client
from prefect.orion.schemas.actions import NotificationPolicyCreate
async def create_slack_notification():
async with get_client() as client:
policy = NotificationPolicyCreate(
name="my-slack-notification",
is_active=True,
state_names=["FAILED"],
tags=["daily-etl"],
message_template="The {name} run of the {flow_name} flow entered a {state_name} state at {timestamp}",
recipients=[{"slack_webhook_url": "<https://hooks.slack.com/services/T0000/B0000/XXXXXXXX>"}]
)
await client.create_notification_policy(policy)
if __name__ == "__main__":
prefect.context.run(create_slack_notification)
3. Using Prefect YAML Configuration:
Notifications might not be directly supported in YAML configurations for deployment yet. You can create deployment files (YAML) for your flows, but notifications are generally configured after deploying the flows.
Given the mentioned issues on GitHub, it seems like direct CLI support is limited, and you might need to use the API for setting up complex notifications.
For specific issues related to notifications after database reset or others, you might want to check or report on GitHub issues:
- Issue with database reset affecting notifications: https://github.com/PrefectHQ/prefect/issues/8536
More general info:
- Documentation for hosted Prefect servers: https://docs.prefect.io/latest/guides/host/
If you have further questions, feel free to ask!João Moreira
07/31/2024, 5:30 PMMarvin
07/31/2024, 5:30 PMNotificationPolicyCreate
, but you can still manage notifications programmatically using the Prefect API. Below is more relevant information for creating notification policies and examples.
### Programmatic Configuration via Prefect API
1. Using Python with Prefect API
You can use the Prefect client and directly interact with API endpoints to create notification policies. Here’s how you can do it:
python
import httpx
# Define your Prefect API URL and token
api_url = "<http://127.0.0.1:4200/api>"
api_token = "your_prefect_api_token"
headers = {
"Authorization": f"Bearer {api_token}"
}
# Define the notification policy
notification_policy = {
"name": "Slack Notification for Failed Runs",
"is_active": True,
"state_names": ["FAILED"],
"tags": ["daily-etl"],
"message_template": "The {name} run of the {flow_name} flow entered a {state_name} state at {timestamp}",
"slack_webhook_url": "<https://hooks.slack.com/services/T0000/B0000/XXXXXXXX>"
}
response = <http://httpx.post|httpx.post>(f"{api_url}/notification_policies/", headers=headers, json=notification_policy)
if response.status_code == 200:
print("Notification policy created successfully")
else:
print("Failed to create notification policy", response.text)
2. Using Prefect CLI:
While there isn’t direct CLI support to create notification policies, you can use the Prefect Cloud Automations interface for such setups.
### Important Links
- For notifications setup, you can refer to the Hosted Prefect Server Guide:
https://docs.prefect.io/latest/guides/host/
For more detailed API usage, the best approach might be to explore API docs directly or create new issues/queries on the Prefect GitHub issues page if functionality appears to be undocumented or unsupported.
If you encounter problems or need more specific examples, feel free to ask!João Moreira
08/01/2024, 11:08 AMprefect.blocks.notifications
) and then using it in prefect.client.schemas.actions
creating a FlowRunNotificationPolicyCreate
. Does this makes sense?Marvin
08/01/2024, 11:08 AMFlowRunNotificationPolicyCreate
makes sense. Here are steps to guide you:
1. Create a Slack Notification Block:
Utilize the Prefect API to create a Slack notification block.
2. Create a FlowRunNotificationPolicy:
Use the FlowRunNotificationPolicyCreate
to apply this notification.
### Step-by-Step
1. Create the Slack Notification Block
python
from prefect.blocks.notifications import SlackWebhook
# Provide your Slack Webhook URL
slack_webhook_block = SlackWebhook(url="<https://hooks.slack.com/services/T0000/B0000/XXXXXXXX>")
# Save Block with a name for reusability
slack_webhook_block.save(name="my-slack-notification-block")
2. Create FlowRunNotificationPolicy
python
from prefect.client.orion import get_client
from prefect.orion.schemas.actions import FlowRunNotificationPolicyCreate
async def create_notification_policy():
async with get_client() as client:
policy = FlowRunNotificationPolicyCreate(
tags=["daily-etl"],
is_active=True,
state_names=["FAILED"],
block_document_id=slack_webhook_block.id,
message_template="The {name} run of the {flow_name} flow entered a {state_name} state at {timestamp}"
)
await client.create_flow_run_notification_policy(policy)
import asyncio
asyncio.run(create_notification_policy())
### Example:
For additional guidance and examples, you can refer to the Prefect code examples repository:
https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/basic.py
### Documentation Links:
- Prefect Blocks: https://docs.prefect.io/latest/concepts/blocks/
- Automations Guide: https://docs.prefect.io/latest/guides/automations/
- Hosted Prefect Server Guide: https://docs.prefect.io/latest/guides/host/
This setup should help you configure notifications programmatically. If you need more help, feel free to ask!João Moreira
08/01/2024, 4:11 PMpydantic.v1.error_wrappers.ValidationError: 1 validation error for FlowRunNotificationPolicyCreate
message_template
Invalid template variable provided: 'name' (type=value_error)
Marvin
08/01/2024, 4:11 PMwith this code:
import asyncio
from prefect.client import get_client
from prefect.client.schemas.actions import FlowRunNotificationPolicyCreate
async def get_block_document_and_create_policy():
async with get_client() as client:
block_document = await client.read_block_document_by_name(
name="notification-block",
block_type_slug="slack-webhook"
)
# Check if block_document is retrieved successfully
if block_document is None:
print("Error: Could not retrieve block document. Aborting notification policy creation.")
return
# Access the ID of the block document
block_document_id = block_document.id
# Create the notification policy object
policy = FlowRunNotificationPolicyCreate(
tags=["daily-etl"], # Add relevant tags
is_active=True,
state_names=["FAILED"],
block_document_id=block_document_id,
message_template="The {name} run of the {flow_name} flow entered a {state_name} state at {timestamp}"
)
# Create the notification policy
await client.create_flow_run_notification_policy(policy)
print("Flow run notification policy created successfully!")
if __name__ == "__main__":
asyncio.run(get_block_document_and_create_policy())
And have this error:
```pydantic.v1.error_wrappers.ValidationError: 1 validation error for FlowRunNotificationPolicyCreate
message_template
Invalid template variable provided: 'name' (type=valueMarvin
08/01/2024, 4:14 PMmessage_template
contains an invalid template variable 'name'
. Prefect requires specific variables to be used in the message template. Let's correct this and ensure we use valid variables.
Here is the corrected code:
1. Create the Slack Notification Block
python
from prefect.blocks.notifications import SlackWebhook
# Provide your Slack Webhook URL
slack_webhook_block = SlackWebhook(url="<https://hooks.slack.com/services/T0000/B0000/XXXXXXXX>")
# Save Block with a name for reusability
slack_webhook_block.save(name="my-slack-notification-block")
2. Create FlowRunNotificationPolicy
Note: You should verify the exact template variables allowed by Prefect. Common options include flow_run_id
, flow_id
, state_name
, timestamp
, etc. Here’s an adjusted example:
python
from prefect.client.orion import get_client
from prefect.orion.schemas.actions import FlowRunNotificationPolicyCreate
async def create_notification_policy():
async with get_client() as client:
policy = FlowRunNotificationPolicyCreate(
tags=["daily-etl"],
is_active=True,
state_names=["FAILED"],
block_document_id=slack_webhook_block.id,
message_template="Run of {flow_name} entered state {state_name} at {timestamp}"
)
await client.create_flow_run_notification_policy(policy)
import asyncio
asyncio.run(create_notification_policy())
### Notes:
1. Ensure the variables used in message_template
are valid. If flow_name
or timestamp
is invalid, you might need to replace them with correct variables.
2. Save and run the script with these corrections, and it should create the notification policy without validation errors.
If you encounter further issues related to template variables, please refer to the Prefect documentation for the latest valid template variables or share more details for assistance!João Moreira
08/02/2024, 10:23 AMMarvin
08/02/2024, 10:23 AMflow_run_id
- flow_id
- flow_name
- state_name
- timestamp
- tags
- url
(e.g., link to the flow run in the UI)
### Example Refinement
Here’s a refined example using likely valid variables:
1. Create the Slack Notification Block
python
from prefect.blocks.notifications import SlackWebhook
# Provide your Slack Webhook URL
slack_webhook_block = SlackWebhook(url="<https://hooks.slack.com/services/T0000/B0000/XXXXXXXX>")
# Save Block with a name for reusability
slack_webhook_block.save(name="my-slack-notification-block")
2. Create FlowRunNotificationPolicy
python
from prefect.client.orion import get_client
from prefect.orion.schemas.actions import FlowRunNotificationPolicyCreate
async def create_notification_policy():
async with get_client() as client:
policy = FlowRunNotificationPolicyCreate(
tags=["daily-etl"],
is_active=True,
state_names=["FAILED"],
block_document_id=slack_webhook_block.id,
message_template="Run of {flow_name} entered state {state_name} at {timestamp}"
)
await client.create_flow_run_notification_policy(policy)
import asyncio
asyncio.run(create_notification_policy())
### Seeking More Information:
- Prefect Discussions or GitHub Issues:
If this doesn’t fully meet your needs or if more details are needed, consider checking or opening discussions or issues on Prefect's GitHub: https://github.com/PrefectHQ/prefect/issues
- Prefect Community:
Join the Prefect Discourse or Slack community to directly engage with other users and Prefect developers.
Feel free to share any specific requirements or errors you encounter, and I can help refine the approach further!