Abrar Hussain
09/18/2025, 10:18 PMMarvin
09/18/2025, 10:18 PMMarvin
09/18/2025, 10:22 PMstate.name == "Late"
and work_queue_name
.
- Run this on a schedule (e.g., every 30–60s) via an EventBridge rule + Lambda.
2) Publish to CloudWatch
- From the Lambda, emit a custom metric like:
- Namespace: Prefect
- MetricName: LateFlowRuns
- Dimensions: WorkQueue = <your-queue-name>
3) Configure ECS service auto scaling
- Create a target-tracking or step scaling policy on your ECS Agent service that uses the custom metric.
- Example: Target tracking with target value “1 LateFlowRun per agent task” or a step policy that adds tasks when LateFlowRuns >= thresholds.
Example Lambda (Python) to publish Late runs per queue
- Assumes Prefect Server API is reachable without auth. If you’ve added auth, attach credentials as needed.
- Set env vars: PREFECT_API_URL (e.g., http://your-prefect-server:4200/api), WORK_QUEUE (e.g., "default").
import os
import json
import time
import boto3
import requests
PREFECT_API_URL = os.environ["PREFECT_API_URL"].rstrip("/")
WORK_QUEUE = os.environ["WORK_QUEUE"]
NAMESPACE = os.environ.get("CW_NAMESPACE", "Prefect")
METRIC_NAME = os.environ.get("CW_METRIC_NAME", "LateFlowRuns")
cloudwatch = boto3.client("cloudwatch")
def _parse_count_response(resp):
# Prefect count endpoints typically return an integer JSON,
# but handle dict format just in case.
data = resp.json()
if isinstance(data, dict) and "count" in data:
return int(data["count"])
return int(data)
def handler(event, context):
# Count Late runs in this work queue
# Endpoint: POST /flow_runs/count
# Filter payload uses Prefect 2.x filter schema
payload = {
"flow_runs": {
"work_queue_name": {"any_": [WORK_QUEUE]},
"state": {"name": {"any_": ["Late"]}}
}
}
url = f"{PREFECT_API_URL}/flow_runs/count"
resp = <http://requests.post|requests.post>(url, json=payload, timeout=10)
resp.raise_for_status()
count = _parse_count_response(resp)
# Publish to CloudWatch
cloudwatch.put_metric_data(
Namespace=NAMESPACE,
MetricData=[{
"MetricName": METRIC_NAME,
"Dimensions": [
{"Name": "WorkQueue", "Value": WORK_QUEUE},
],
"Timestamp": time.gmtime(),
"Value": count,
"Unit": "Count"
}]
)
return {"work_queue": WORK_QUEUE, "late_count": count}
ECS Auto Scaling configuration (Agent service)
- Register scalable target for DesiredCount on your Agent ECS service.
- Create a Target Tracking policy on the custom metric:
- Metric: Namespace=Prefect, MetricName=LateFlowRuns, Dimension WorkQueue=<your-queue>
- Target value: e.g., 1 late run per agent task (tune to your environment)
- Min/Max task count and cooldowns per your needs
- Alternatively use Step scaling:
- >= 1 late run: +1 task
- >= 5 late runs: +3 tasks
- Etc.
Notes and tips
- Why “Late”? Late runs indicate scheduled runs that missed their expected start time and haven’t started—good proxy for backlog.
- If you prefer “scheduled backlog” instead, you can filter for state.name == "Scheduled"
and expected_start_time.before_ == now
.
- Ensure you also have sufficient execution capacity. Scaling the agent increases claimers, but you still need enough ECS capacity (capacity provider/scaling) to run the ECS tasks your flows use.
- If you operate multiple work queues, either:Marvin
09/18/2025, 10:22 PM