Kaleb King
06/25/2024, 4:37 PMabc-deployment
that runs at the top of every hour using abc-worker
pool and default
queue and runs on a VM. If the VM is shut down, loses internet connection, or anything else that prevents the scheduled flow from running, I want it to do one of the following:
• Don't create new flow runs OR
• Cancel or delete the older flow run when a new one is created
Basically, when the VM comes back up I only want it to execute one flow run for each deployment.Marvin
06/25/2024, 4:37 PMbash
prefect flow-run cancel <flow-run-id>
You can retrieve the flow run ID from the Prefect UI or by querying the Prefect API.
### Example Flow to Cancel Older Runs
Here's an example of a flow that cancels older runs if newer runs are created:
python
from prefect import flow, task, get_client
from datetime import datetime, timedelta
@task
async def cancel_older_runs(flow_run_id: str):
async with get_client() as client:
# Get the current flow run
current_run = await client.read_flow_run(flow_run_id)
# Get all flow runs for the same deployment
deployment_runs = await client.read_flow_runs(deployment_id=current_run.deployment_id)
# Cancel older runs
for run in deployment_runs:
if run.state == "Scheduled" and run.id != current_run.id and run.updated < (datetime.utcnow() - timedelta(hours=1)):
await client.set_flow_run_state(run_id=run.id, state="Cancelled")
@flow
async def monitored_flow():
# Replace with your actual task logic
current_run_id = "<your-flow-run-id>"
await cancel_older_runs(current_run_id)
# Your main task logic here
if __name__ == "__main__":
monitored_flow()
### Additional Resources
- Work Pools and Queues
- Flow Run Cancellation (look for the section on cancellation via CLI or UI)
This setup will help ensure that when your VM comes back up, only the most recent flow run executes, preventing redundancy and the piling up of tasks.Kaleb King
06/25/2024, 5:03 PMMarvin
06/25/2024, 5:03 PMprefect.yaml
file:
yaml
# prefect.yaml
name: flow-cancellation-automation
prefect-version: 2.x.y
entrypoint: path/to/your/flow.py:your_flow_function
deployments:
- name: abc-deployment
work_queue_name: default
infra_overrides: {}
storage: {}
parameters: {}
tags: []
schedule:
cron: "0 * * * *" # Runs at the top of every hour
3. Create the Automation via Prefect Cloud:
- Go to the Prefect Cloud UI, navigate to the Automations section, and create a new automation.
- For example, you can set up a condition that triggers a script to cancel older flow runs.
### Example Condition and Action
1. Condition:
- Trigger when a new flow run is created.
2. Action:
- Run a custom script or use a Prefect API request to cancel older runs.
### Automation Script Example
Here’s an example script to automate cancellation of older flow runs:
python
from prefect.client import get_client
from datetime import datetime, timedelta
async def cancel_old_deployment_runs(deployment_id):
async with get_client() as client:
deployment_runs = await client.read_flow_runs(deployment_id=deployment_id)
now = datetime.utcnow()
for run in deployment_runs:
if run.state.name == "Scheduled" and (now - run.timestamp).total_seconds() > 3600:
await client.set_flow_run_state(flow_run_id=run.id, state="Cancelled")
# Replace with your deployment ID
deployment_id = "your-deployment-id"
await cancel_old_deployment_runs(deployment_id)
### Documentation and Resources
- Prefect Automations Guide
- Flow Run Cancellation Documentation
By setting up an automation as described, you can ensure only the most recent flow run is executed during availability periods, while preventing any redundant runs from piling up.Kaleb King
06/25/2024, 6:49 PMMarvin
06/25/2024, 6:49 PM\n\n
) for paragraph breaks in your notification message body.
Example Notification Body with Line Breaks:
json
{
"text": "Line one\nLine two\n\nNew paragraph"
}
### For Email Notifications
In email notifications, HTML can be used to add formatting such as line breaks. Use the <br>
tag for line breaks and <p>
tags for paragraphs.
Example Notification Body with HTML Tags:
html
Line one<br>Line two<br><br>New paragraph
### Setting Up Notifications in Prefect Cloud
1. Go to the Prefect Cloud UI:
- Navigate to the Notifications page.
- Select "Create Notification" or click on the "+" button.
2. Configure the Notification:
- Choose the run states that should trigger the notification.
- Apply tags to filter which flow runs are covered by the notification.
- Select the method of notification (e.g., Slack webhook, email).
### Sample Configuration for Slack
Here’s how to configure a Slack notification with line breaks:
python
import httpx
from prefect.utilities.notifications import slack_notifier
def my_slack_notifier(flow_run, task_run, result):
message_body = f"""Flow run {flow_run} has entered state {flow_run.state}
Task {task_run.task.slug} has encountered an issue:
{result}
Check the task logs for details.
"""
url = "your-slack-webhook-url"
payload = {"text": message_body}
headers = {"Content-Type": "application/json"}
response = <http://httpx.post|httpx.post>(url, json=payload, headers=headers)
slack_notifier(slack_notifier=my_slack_notifier)
### Sample Configuration for Email
Here’s how to configure an email notification with HTML for line breaks:
python
from prefect import task, Flow
from prefect.utilities.notifications import email_notifier
@task(state_handlers=[email_notifier("<mailto:your-email@example.com|your-email@example.com>", html=True)])
def my_task():
message_body = """
Task Encountered an Issue<br><br>
Please check the flow run for details.
"""
# logic for sending email using prefect email_notifier
with Flow("My Flow") as flow:
my_task()
flow.run()
### Relevant Documentation
- Prefect Notifications Guide
- Slack Message Formatting
- Prefect Email Notifications
By following the above steps, you can add line breaks and format the body of your Prefect notifications to fit your needs.Kaleb King
06/25/2024, 6:50 PMMarvin
06/25/2024, 6:50 PM<br>
) or Markdown styles in your message payload.
Here is an example of how you can format a message body with line breaks in a Teams webhook notification:
### Creating a Microsoft Teams Webhook Notification
1. Set Up the Webhook in Teams:
- Go to the channel where you want to receive notifications.
- Click on the "..." (More options) next to the channel name, then select "Connectors."
- Search for and select "Incoming Webhook."
- Give your webhook a name, and copy the URL generated. This URL will be used to send the notifications.
2. Formatting the Notification with Line Breaks:
- You can use either HTML or Markdown for formatting messages.
### Example Using HTML
Here is how you can format the notification body using HTML:
json
{
"text": "Flow Run Notification",
"sections": [
{
"activityTitle": "Flow Run Status Update",
"activitySubtitle": "Details about the flow run",
"activityText": "Line one<br>Line two<br><br>New paragraph"
}
]
}
### Example Using Markdown
Here is how you can format the notification body using Markdown:
json
{
"text": "Flow Run Notification",
"sections": [
{
"activityTitle": "Flow Run Status Update",
"activitySubtitle": "Details about the flow run",
"activityText": "Line one\n\nLine two\n\nNew paragraph" // Use \n\n for line breaks in markdown
}
]
}
### Sending the Webhook Notification from Prefect
Here is a Python example using httpx
to send the notification to your Teams channel:
python
import httpx
def send_teams_notification(webhook_url, message):
headers = { "Content-Type": "application/json" }
payload = {
"text": "Flow Run Notification",
"sections": [
{
"activityTitle": "Flow Run Status Update",
"activitySubtitle": "Details about the flow run",
"activityText": message
}
]
}
response = <http://httpx.post|httpx.post>(webhook_url, json=payload, headers=headers)
if response.status_code == 200:
print("Notification sent successfully!")
else:
print(f"Failed to send notification: {response.status_code}\n{response.text}")
# Webhook URL obtained from Microsoft Teams Channel
webhook_url = "your-teams-webhook-url"
message = "Line one<br>Line two<br><br>New paragraph" # Use HTML or Markdown as needed
send_teams_notification(webhook_url, message)
### Resources
- Prefect Webhooks Guide
- Microsoft Teams Message Formatting
By following these steps, you can format and send well-structured notifications to Microsoft Teams using Prefect webhooks.Kaleb King
06/25/2024, 6:59 PMMarvin
06/25/2024, 6:59 PMjinja
Flow Run Notification
----------------------
Flow run {{ flow_run_id }} for deployment {{ deployment_id }} has entered state "{{ state }}".
Work Queue: {{ work_queue_name }}
Work Pool: {{ work_pool_name }}
Details:
{{ details }}
### Step 2: Fetch Work Pool Information
When the automation is triggered (e.g., when a flow run changes state), you need to fetch the information about the work pool associated with the work queue.
### Step 3: Pass Information into the Template
Here's a complete Python example demonstrating how to fetch the work pool information, use Jinja to render the notification, and send it via a webhook:
python
import httpx
from prefect import get_client
from jinja2 import Template
# Sample Jinja template for the notification
notification_template = """
Flow Run Notification
----------------------
Flow run {{ flow_run_id }} for deployment {{ deployment_id }} has entered state "{{ state }}".
Work Queue: {{ work_queue_name }}
Work Pool: {{ work_pool_name }}
Details:
{{ details }}
"""
# Function to fetch work pool details
async def fetch_work_pool_details(work_queue_name):
async with get_client() as client:
# Fetch the work queue details
work_queue = await client.read_work_queue_by_name(work_queue_name)
# Fetch the work pool details using the work_pool_id from the work queue
work_pool = await client.read_work_pool(work_queue.work_pool_id)
return work_pool.name
# Function to send the notification
async def send_teams_notification(webhook_url, flow_run, work_queue_name, state, details):
# Fetch the work pool details
work_pool_name = await fetch_work_pool_details(work_queue_name)
# Render the Jinja template with the relevant details
template = Template(notification_template)
rendered_message = template.render(
flow_run_id=flow_run.id,
deployment_id=flow_run.deployment_id,
state=state,
work_queue_name=work_queue_name,
work_pool_name=work_pool_name,
details=details
)
# Send the formatted message to Microsoft Teams
headers = {"Content-Type": "application/json"}
payload = {
"text": "Flow Run Notification",
"sections": [{
"activityTitle": "Flow Run Status Update",
"activitySubtitle": "Details about the flow run",
"activityText": rendered_message.replace("\n", "<br>")
}]
}
response = await <http://httpx.post|httpx.post>(webhook_url, json=payload, headers=headers)
if response.status_code == 200:
print("Notification sent successfully!")
else:
print(f"Failed to send notification: {response.status_code}\n{response.text}")
# Sample call to send a notification
webhook_url = "your-teams-webhook-url"
flow_run = {
"id": "sample-flow-run-id",
"deployment_id": "sample-deployment-id"
}
work_queue_name = "default"
state = "Failed"
details = "The flow run encountered an error."
await send_teams_notification(webhook_url, flow_run, work_queue_name, state, details)
### Explanation
1. Template Definition: Defines the structure of the notification message.
2. Fetch Work Pool Details: Uses the Prefect client to fetch details about the work queue and its associated work pool.
3. Render and Send Notification: Renders the Jinja template with the fetched details and sends the formatted message to the Microsoft Teams webhook.
### Further Customization
You can further customize the Jinja template and the way you fetch and pass information depending on your specific requirements.Marvin
06/25/2024, 6:59 PMKaleb King
06/25/2024, 7:21 PMMarvin
06/25/2024, 7:21 PM{{ }}
):
### Common Variables in Jinja Templates for Prefect Notifications
1. Flow Run Variables:
- `flow_run.id`: The unique identifier of the flow run.
- `flow_run.deployment_id`: The deployment ID associated with the flow run.
- `flow_run.state.name`: The current state of the flow run (e.g., "Scheduled", "Running", "Failed").
- `flow_run.name`: The name of the flow run.
- `flow_run.created`: The timestamp when the flow run was created.
- `flow_run.updated`: The timestamp when the flow run was last updated.
2. Work Queue Variables:
- `work_queue.name`: The name of the work queue that the flow run is linked to.
- `work_queue.id`: The ID of the work queue.
3. Work Pool Variables:
- You’ll typically need to fetch this information programmatically, as shown in the previous example, because direct references might not be automatically available in the notification context.
4. Additional Contextual Variables:
- `state`: Custom message or state details you want to include.
### Example: Complete Jinja Template with Variables
Here’s an example of a Jinja template that incorporates these variables:
jinja
Flow Run Notification
----------------------
Flow run {{ flow_run.id }} for deployment {{ flow_run.deployment_id }} has entered state "{{ flow_run.state.name }}".
Work Queue: {{ work_queue.name }}
Work Pool: {{ work_pool_name }} // Assuming work_pool_name is set programmatically
Details:
Created: {{ flow_run.created }}
Updated: {{ flow_run.updated }}
### Example of Using Jinja in Python with Prefect and Sending a Teams Notification
```python
import httpx
from prefect.client import get_client
from jinja2 import Template
# Sample Jinja template
notification_template = """
Flow Run Notification
----------------------
Flow run {{ flow_run.id }} for deployment {{ flow_run.deployment_id }} has entered state "{{ flow_run.state.name }}".
Work Queue: {{ work_queue.name }}
Work Pool: {{ work_pool_name }} // Assuming you fetch this programmatically
Details:
Created: {{ flow_run.created }}
Updated: {{ flow_run.updated }}
"""
async def fetch_work_pool_name(work_queue_name):
"""Fetch the work pool name based on work queue."""
async with get_client() as client:
work_queue = await client.read_work_queue_by_name(work_queue_name)
work_pool = await client.read_work_pool(work_queue.work_pool_id)
return work_pool.name
async def send_teams_notification(webhook_url, flow_run, work_queue_name):
"""Send a formatted Microsoft Teams webhook notification."""
work_pool_name = await fetch_work_pool_name(work_queue_name)
template = Template(notification_template)
rendered_message = template.render(
flow_run=flow_run,
work_queue={"name": work_queue_name},
work_pool_name=work_pool_name
)
headers = {"Content-Type": "application/json"}
payload = {
"text": "Flow Run Notification",
"sections": [{
"activityTitle": "Flow Run Status Update",
"activitySubtitle": "Details about the flow run",
"activityText": rendered_message.replace("\n", "<br>")
}]
}
response = await httpx.post(webhook_url, json=payload, headers=headers)
if response.status_code == 200:
print("Notification sent successfully!")
else:
print(f"Failed to send notification: {response.status_code}\n{response.text}")
# Simulated values for demonstration
webhook_url = "your-teams-webhook-url"
flow_run = {
"id": "sample-flow-run-id",
"deployment_id": "sample-deployment-id",
"state": {"name": "Failed"},
"created": "2023-10-02T100000Z",
"updated": "2023-10-02T110000Z"
}Marvin
06/25/2024, 7:21 PM