<@ULVA73B9P> I have a deployment that runs on an h...
# ask-marvin
k
@Marvin I have a deployment that runs on an hourly schedule. In the event of the work pool and/or queue being unavailable, how can I prevent runs from piling up? For example, I have
abc-deployment
that runs at the top of every hour using
abc-worker
pool and
default
queue and runs on a VM. If the VM is shut down, loses internet connection, or anything else that prevents the scheduled flow from running, I want it to do one of the following: • Don't create new flow runs OR • Cancel or delete the older flow run when a new one is created Basically, when the VM comes back up I only want it to execute one flow run for each deployment.
m
To achieve your requirement of preventing flow runs from piling up when the work pool and/or queue is unavailable, you can use the principles of concurrency limits and prioritize flow cancellation. Here’s a step-by-step plan using Prefect: 1. Concurrency Limits: - Set up concurrency limits for your work pools to limit the number of simultaneous flow runs. This can prevent too many runs from starting at the same time when the VM becomes available. - Configure the concurrency limits in your work pools to control the maximum number of concurrent runs. 2. Flow Run Cancellation: - Utilize Prefect’s CLI or UI to cancel flow runs that are already in the queue. - Create a command or a flow that checks for older runs and cancels them if new runs are created. Here's how you can set the concurrency limit: ### Setting Concurrency Limits You can define concurrency limits when setting up your work pools. Here's a guide: Work Pools Documentation. ### Cancelling Flow Runs Using CLI You can cancel any pending or running flow runs using the Prefect CLI:
Copy code
bash
prefect flow-run cancel <flow-run-id>
You can retrieve the flow run ID from the Prefect UI or by querying the Prefect API. ### Example Flow to Cancel Older Runs Here's an example of a flow that cancels older runs if newer runs are created:
Copy code
python
from prefect import flow, task, get_client
from datetime import datetime, timedelta

@task
async def cancel_older_runs(flow_run_id: str):
    async with get_client() as client:
        # Get the current flow run
        current_run = await client.read_flow_run(flow_run_id)
        
        # Get all flow runs for the same deployment
        deployment_runs = await client.read_flow_runs(deployment_id=current_run.deployment_id)
        
        # Cancel older runs
        for run in deployment_runs:
            if run.state == "Scheduled" and run.id != current_run.id and run.updated < (datetime.utcnow() - timedelta(hours=1)):
                await client.set_flow_run_state(run_id=run.id, state="Cancelled")

@flow
async def monitored_flow():
    # Replace with your actual task logic
    current_run_id = "<your-flow-run-id>"
    await cancel_older_runs(current_run_id)
    # Your main task logic here

if __name__ == "__main__":
    monitored_flow()
### Additional Resources - Work Pools and Queues - Flow Run Cancellation (look for the section on cancellation via CLI or UI) This setup will help ensure that when your VM comes back up, only the most recent flow run executes, preventing redundancy and the piling up of tasks.
k
@Marvin Could I use an automation to handle the flow run cancellation?
m
Yes, you can use Prefect Automations to handle the flow run cancellation. Here's how you can set up an automation to cancel older flow runs when new ones are created: ### Setting Up an Automation in Prefect 1. Create a Deployment: - Ensure the flow is associated with a deployment, as automations work with flow runs tied to deployments. 2. Define the Automation: - Use Prefect's automation feature to set up a trigger that cancels older flow runs. Here’s a guide on how to define a deployment using a
prefect.yaml
file:
Copy code
yaml
# prefect.yaml
name: flow-cancellation-automation
prefect-version: 2.x.y
entrypoint: path/to/your/flow.py:your_flow_function

deployments:
  - name: abc-deployment
    work_queue_name: default
    infra_overrides: {}
    storage: {}
    parameters: {}
    tags: []
    schedule:
      cron: "0 * * * *"  # Runs at the top of every hour
3. Create the Automation via Prefect Cloud: - Go to the Prefect Cloud UI, navigate to the Automations section, and create a new automation. - For example, you can set up a condition that triggers a script to cancel older flow runs. ### Example Condition and Action 1. Condition: - Trigger when a new flow run is created. 2. Action: - Run a custom script or use a Prefect API request to cancel older runs. ### Automation Script Example Here’s an example script to automate cancellation of older flow runs:
Copy code
python
from prefect.client import get_client
from datetime import datetime, timedelta

async def cancel_old_deployment_runs(deployment_id):
    async with get_client() as client:
        deployment_runs = await client.read_flow_runs(deployment_id=deployment_id)
        now = datetime.utcnow()
        for run in deployment_runs:
            if run.state.name == "Scheduled" and (now - run.timestamp).total_seconds() > 3600:
                await client.set_flow_run_state(flow_run_id=run.id, state="Cancelled")

# Replace with your deployment ID
deployment_id = "your-deployment-id"
await cancel_old_deployment_runs(deployment_id)
### Documentation and Resources - Prefect Automations Guide - Flow Run Cancellation Documentation By setting up an automation as described, you can ensure only the most recent flow run is executed during availability periods, while preventing any redundant runs from piling up.
k
@Marvin How can I add line breaks to the body of a notification?
m
To format the body of a notification in Prefect, such as adding line breaks, you need to use the appropriate formatting syntax. Here’s how you can achieve this for different types of notifications: ### For Slack Notifications When sending notifications via Slack, you can use Slack's markdown-like formatting to include line breaks. Use double newlines (
\n\n
) for paragraph breaks in your notification message body. Example Notification Body with Line Breaks:
Copy code
json
{
  "text": "Line one\nLine two\n\nNew paragraph"
}
### For Email Notifications In email notifications, HTML can be used to add formatting such as line breaks. Use the
<br>
tag for line breaks and
<p>
tags for paragraphs. Example Notification Body with HTML Tags:
Copy code
html
Line one<br>Line two<br><br>New paragraph
### Setting Up Notifications in Prefect Cloud 1. Go to the Prefect Cloud UI: - Navigate to the Notifications page. - Select "Create Notification" or click on the "+" button. 2. Configure the Notification: - Choose the run states that should trigger the notification. - Apply tags to filter which flow runs are covered by the notification. - Select the method of notification (e.g., Slack webhook, email). ### Sample Configuration for Slack Here’s how to configure a Slack notification with line breaks:
Copy code
python
import httpx
from prefect.utilities.notifications import slack_notifier

def my_slack_notifier(flow_run, task_run, result):
    message_body = f"""Flow run {flow_run} has entered state {flow_run.state}
Task {task_run.task.slug} has encountered an issue:
{result}
Check the task logs for details.
"""

    url = "your-slack-webhook-url"
    payload = {"text": message_body}
    headers = {"Content-Type": "application/json"}
    
    response = <http://httpx.post|httpx.post>(url, json=payload, headers=headers)

slack_notifier(slack_notifier=my_slack_notifier)
### Sample Configuration for Email Here’s how to configure an email notification with HTML for line breaks:
Copy code
python
from prefect import task, Flow
from prefect.utilities.notifications import email_notifier

@task(state_handlers=[email_notifier("<mailto:your-email@example.com|your-email@example.com>", html=True)])
def my_task():
    message_body = """
        Task Encountered an Issue<br><br>
        Please check the flow run for details.
    """
    # logic for sending email using prefect email_notifier

with Flow("My Flow") as flow:
    my_task()

flow.run()
### Relevant Documentation - Prefect Notifications Guide - Slack Message Formatting - Prefect Email Notifications By following the above steps, you can add line breaks and format the body of your Prefect notifications to fit your needs.
k
@Marvin What about a Microsoft Teams webhook notification?
m
For sending notifications to Microsoft Teams through a webhook and adding line breaks, you will use the Markdown formatting supported by Microsoft Teams. You can utilize the HTML line break (
<br>
) or Markdown styles in your message payload. Here is an example of how you can format a message body with line breaks in a Teams webhook notification: ### Creating a Microsoft Teams Webhook Notification 1. Set Up the Webhook in Teams: - Go to the channel where you want to receive notifications. - Click on the "..." (More options) next to the channel name, then select "Connectors." - Search for and select "Incoming Webhook." - Give your webhook a name, and copy the URL generated. This URL will be used to send the notifications. 2. Formatting the Notification with Line Breaks: - You can use either HTML or Markdown for formatting messages. ### Example Using HTML Here is how you can format the notification body using HTML:
Copy code
json
{
  "text": "Flow Run Notification",
  "sections": [
    {
      "activityTitle": "Flow Run Status Update",
      "activitySubtitle": "Details about the flow run",
      "activityText": "Line one<br>Line two<br><br>New paragraph"
    }
  ]
}
### Example Using Markdown Here is how you can format the notification body using Markdown:
Copy code
json
{
  "text": "Flow Run Notification",
  "sections": [
    {
      "activityTitle": "Flow Run Status Update",
      "activitySubtitle": "Details about the flow run",
      "activityText": "Line one\n\nLine two\n\nNew paragraph" // Use \n\n for line breaks in markdown
    }
  ]
}
### Sending the Webhook Notification from Prefect Here is a Python example using
httpx
to send the notification to your Teams channel:
Copy code
python
import httpx

def send_teams_notification(webhook_url, message):
    headers = { "Content-Type": "application/json" }
    payload = {
        "text": "Flow Run Notification",
        "sections": [
            {
                "activityTitle": "Flow Run Status Update",
                "activitySubtitle": "Details about the flow run",
                "activityText": message
            }
        ]
    }
    
    response = <http://httpx.post|httpx.post>(webhook_url, json=payload, headers=headers)
    if response.status_code == 200:
        print("Notification sent successfully!")
    else:
        print(f"Failed to send notification: {response.status_code}\n{response.text}")

# Webhook URL obtained from Microsoft Teams Channel
webhook_url = "your-teams-webhook-url"
message = "Line one<br>Line two<br><br>New paragraph"  # Use HTML or Markdown as needed

send_teams_notification(webhook_url, message)
### Resources - Prefect Webhooks Guide - Microsoft Teams Message Formatting By following these steps, you can format and send well-structured notifications to Microsoft Teams using Prefect webhooks.
k
@Marvin Using jinja, how can I also include information about the work pool when the automation is based on a work queue in that work pool?
m
To include information about the work pool in a notification template using Jinja, you’ll need to fetch the relevant work pool details when the automation is triggered and pass it into the template. Here’s a step-by-step approach on how to achieve this: ### Step 1: Create a Jinja Template Create a Jinja template for your notification message. For example, the template might look like this:
Copy code
jinja
Flow Run Notification
----------------------
Flow run {{ flow_run_id }} for deployment {{ deployment_id }} has entered state "{{ state }}".

Work Queue: {{ work_queue_name }}
Work Pool: {{ work_pool_name }}

Details:
{{ details }}
### Step 2: Fetch Work Pool Information When the automation is triggered (e.g., when a flow run changes state), you need to fetch the information about the work pool associated with the work queue. ### Step 3: Pass Information into the Template Here's a complete Python example demonstrating how to fetch the work pool information, use Jinja to render the notification, and send it via a webhook:
Copy code
python
import httpx
from prefect import get_client
from jinja2 import Template

# Sample Jinja template for the notification
notification_template = """
Flow Run Notification
----------------------
Flow run {{ flow_run_id }} for deployment {{ deployment_id }} has entered state "{{ state }}".

Work Queue: {{ work_queue_name }}
Work Pool: {{ work_pool_name }}

Details:
{{ details }}
"""

# Function to fetch work pool details
async def fetch_work_pool_details(work_queue_name):
    async with get_client() as client:
        # Fetch the work queue details
        work_queue = await client.read_work_queue_by_name(work_queue_name)
        
        # Fetch the work pool details using the work_pool_id from the work queue
        work_pool = await client.read_work_pool(work_queue.work_pool_id)
        
        return work_pool.name

# Function to send the notification
async def send_teams_notification(webhook_url, flow_run, work_queue_name, state, details):
    # Fetch the work pool details
    work_pool_name = await fetch_work_pool_details(work_queue_name)
    
    # Render the Jinja template with the relevant details
    template = Template(notification_template)
    rendered_message = template.render(
        flow_run_id=flow_run.id,
        deployment_id=flow_run.deployment_id,
        state=state,
        work_queue_name=work_queue_name,
        work_pool_name=work_pool_name,
        details=details
    )
    
    # Send the formatted message to Microsoft Teams
    headers = {"Content-Type": "application/json"}
    payload = {
        "text": "Flow Run Notification",
        "sections": [{
            "activityTitle": "Flow Run Status Update",
            "activitySubtitle": "Details about the flow run",
            "activityText": rendered_message.replace("\n", "<br>")
        }]
    }
    
    response = await <http://httpx.post|httpx.post>(webhook_url, json=payload, headers=headers)
    if response.status_code == 200:
        print("Notification sent successfully!")
    else:
        print(f"Failed to send notification: {response.status_code}\n{response.text}")

# Sample call to send a notification
webhook_url = "your-teams-webhook-url"
flow_run = {
    "id": "sample-flow-run-id",
    "deployment_id": "sample-deployment-id"
}
work_queue_name = "default"
state = "Failed"
details = "The flow run encountered an error."

await send_teams_notification(webhook_url, flow_run, work_queue_name, state, details)
### Explanation 1. Template Definition: Defines the structure of the notification message. 2. Fetch Work Pool Details: Uses the Prefect client to fetch details about the work queue and its associated work pool. 3. Render and Send Notification: Renders the Jinja template with the fetched details and sends the formatted message to the Microsoft Teams webhook. ### Further Customization You can further customize the Jinja template and the way you fetch and pass information depending on your specific requirements.
### Resources - Prefect Client API - Jinja2 Template Documentation - Microsoft Teams Webhooks With this approach, you can dynamically include work pool information in your Microsoft Teams webhook notifications using Prefect automations and Jinja templating.
k
@Marvin What are all the possible options I can enter between the double curly braces?
m
Using Jinja templates in Prefect automations allows you to dynamically generate notification messages with information contextually relevant to your flow runs and work pools. Here are some of the key variables and contexts you can use within the double curly braces (
{{ }}
): ### Common Variables in Jinja Templates for Prefect Notifications 1. Flow Run Variables: - `flow_run.id`: The unique identifier of the flow run. - `flow_run.deployment_id`: The deployment ID associated with the flow run. - `flow_run.state.name`: The current state of the flow run (e.g., "Scheduled", "Running", "Failed"). - `flow_run.name`: The name of the flow run. - `flow_run.created`: The timestamp when the flow run was created. - `flow_run.updated`: The timestamp when the flow run was last updated. 2. Work Queue Variables: - `work_queue.name`: The name of the work queue that the flow run is linked to. - `work_queue.id`: The ID of the work queue. 3. Work Pool Variables: - You’ll typically need to fetch this information programmatically, as shown in the previous example, because direct references might not be automatically available in the notification context. 4. Additional Contextual Variables: - `state`: Custom message or state details you want to include. ### Example: Complete Jinja Template with Variables Here’s an example of a Jinja template that incorporates these variables:
Copy code
jinja
Flow Run Notification
----------------------
Flow run {{ flow_run.id }} for deployment {{ flow_run.deployment_id }} has entered state "{{ flow_run.state.name }}".

Work Queue: {{ work_queue.name }}
Work Pool: {{ work_pool_name }}  // Assuming work_pool_name is set programmatically

Details:
Created: {{ flow_run.created }}
Updated: {{ flow_run.updated }}
### Example of Using Jinja in Python with Prefect and Sending a Teams Notification ```python import httpx from prefect.client import get_client from jinja2 import Template # Sample Jinja template notification_template = """ Flow Run Notification ---------------------- Flow run {{ flow_run.id }} for deployment {{ flow_run.deployment_id }} has entered state "{{ flow_run.state.name }}". Work Queue: {{ work_queue.name }} Work Pool: {{ work_pool_name }} // Assuming you fetch this programmatically Details: Created: {{ flow_run.created }} Updated: {{ flow_run.updated }} """ async def fetch_work_pool_name(work_queue_name): """Fetch the work pool name based on work queue.""" async with get_client() as client: work_queue = await client.read_work_queue_by_name(work_queue_name) work_pool = await client.read_work_pool(work_queue.work_pool_id) return work_pool.name async def send_teams_notification(webhook_url, flow_run, work_queue_name): """Send a formatted Microsoft Teams webhook notification.""" work_pool_name = await fetch_work_pool_name(work_queue_name) template = Template(notification_template) rendered_message = template.render( flow_run=flow_run, work_queue={"name": work_queue_name}, work_pool_name=work_pool_name ) headers = {"Content-Type": "application/json"} payload = { "text": "Flow Run Notification", "sections": [{ "activityTitle": "Flow Run Status Update", "activitySubtitle": "Details about the flow run", "activityText": rendered_message.replace("\n", "<br>") }] } response = await httpx.post(webhook_url, json=payload, headers=headers) if response.status_code == 200: print("Notification sent successfully!") else: print(f"Failed to send notification: {response.status_code}\n{response.text}") # Simulated values for demonstration webhook_url = "your-teams-webhook-url" flow_run = { "id": "sample-flow-run-id", "deployment_id": "sample-deployment-id", "state": {"name": "Failed"}, "created": "2023-10-02T100000Z", "updated": "2023-10-02T110000Z" }
work_queue_name = "default" import asyncio asyncio.run(send_teams_notification(webhook_url, flow_run, work_queue_name)) ``` ### Documentation and Resources - Jinja2 Template Documentation - Prefect Notifications - Prefect Automations This setup ensures that your notifications are dynamically populated with the most relevant and contextual information, making your monitoring and alerting processes more effective.