merlin
03/14/2025, 12:34 AMMarvin
03/14/2025, 12:34 AMwith tags("your_tag_name")
in your flow definition.
2. Triggers and Automations: Prefect 3.x allows you to set up automations with triggers. You can define these triggers in a YAML file or directly through the Prefect UI. These triggers can be based on flow run states, and you can configure them to match flows with specific tags.
3. Notification Configuration: You can set up notification blocks in the Prefect UI to determine where notifications should be sent (e.g., Slack, email). Then, create an automation that uses these notification blocks to alert you based on the flow's completion or failure state.
For more detailed guidance, you can refer to the Prefect 3.x Automations and Triggers documentation.
If you want to explore how tags are used in code, you can find an example here.merlin
03/14/2025, 12:37 AMMarvin
03/14/2025, 12:37 AMmerlin
03/14/2025, 12:42 AMMarvin
03/14/2025, 12:42 AMpython
from prefect import get_client
from prefect.automations import AutomationCore, EventTrigger, SendNotification
from prefect.blocks.notifications import SlackWebhook
# Initialize client
client = get_client()
# Define the Slack webhook block name
slack_block_name = "your-slack-webhook-block-name"
# Define the automation
automation = AutomationCore(
name="Notify on Alarm Tag Failure",
triggers=[
EventTrigger(
event="prefect.flow-run.Failed",
match_related=[{"prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.alarm"}]
)
],
actions=[
SendNotification(
block_name=slack_block_name,
message="A flow with the 'alarm' tag has failed!"
)
]
)
# Register the automation
client.create_automation(automation)
### Key Points:
- Slack Webhook Block: Ensure the slack_block_name
is set to the name of your existing Slack webhook block.
- EventTrigger: It listens for the prefect.flow-run.Failed
event and matches flows with the 'alarm' tag.
- SendNotification: This action sends a notification using the specified Slack webhook block.
This script uses the Prefect client to create and register the automation with the specified triggers and actions. Adjust the block name and message as needed for your setup.merlin
03/14/2025, 12:45 AMMarvin
03/14/2025, 12:45 AMpython
from prefect import get_client
from prefect.automations import AutomationCore, EventTrigger, SendNotification
from prefect.blocks.notifications import SlackWebhook
# Initialize client
client = get_client()
# Define the Slack webhook block name
slack_block_name = "your-slack-webhook-block-name"
# Function to create automation for any tag
def create_automation_for_tag(tag_name):
# Define the automation
automation = AutomationCore(
name=f"Notify on {tag_name} Tag Failure",
triggers=[
EventTrigger(
event="prefect.flow-run.Failed",
match_related=[{"prefect.resource.role": "tag", "prefect.resource.id": f"prefect.tag.{tag_name}"}]
)
],
actions=[
SendNotification(
block_name=slack_block_name,
message=f"A flow with the '{tag_name}' tag has failed!"
)
]
)
# Register the automation
client.create_automation(automation)
# Example usage with an arbitrary tag
create_automation_for_tag("your-arbitrary-tag")
### Key Changes:
- Function Definition: Wrapped the automation creation logic in a function create_automation_for_tag
that accepts tag_name
as a parameter.
- Dynamic Tag Handling: The tag name is used dynamically in both the trigger's match condition and the notification message.
You can call create_automation_for_tag
with any tag name you want to monitor for failures. Adjust the tag_name
in the function call as needed.merlin
03/14/2025, 3:39 AMMarvin
03/14/2025, 3:40 AMpython
from prefect import get_client
from prefect.automations import AutomationCore, EventTrigger, SendNotification
from prefect.blocks.notifications import SlackWebhook
# Initialize client
client = get_client()
# Define the Slack webhook block name
slack_block_name = "your-slack-webhook-block-name"
# Function to create automation for any tag and multiple states
def create_automation_for_tag(tag_name):
# Define the automation
automation = AutomationCore(
name=f"Notify on {tag_name} Tag State Change",
triggers=[
EventTrigger(
event={"prefect.flow-run.Failed", "prefect.flow-run.TimedOut", "prefect.flow-run.Crashed"},
match_related=[{"prefect.resource.role": "tag", "prefect.resource.id": f"prefect.tag.{tag_name}"}]
)
],
actions=[
SendNotification(
block_name=slack_block_name,
message=f"A flow with the '{tag_name}' tag has entered a critical state!"
)
]
)
# Register the automation
client.create_automation(automation)
# Example usage with an arbitrary tag
create_automation_for_tag("your-arbitrary-tag")
### Key Changes:
- Multiple Event Triggers: The event
parameter in EventTrigger
is now a set that includes "prefect.flow-run.Failed"
, "prefect.flow-run.TimedOut"
, and "prefect.flow-run.Crashed"
.
- Dynamic Tag Handling: The function can still be used with any arbitrary tag.
This setup ensures that the automation will trigger the Slack notification if the flow with the specified tag enters any of the specified states. Adjust the tag_name
in the function call as needed.