<@ULVA73B9P> We have prefect 2.18 server and agent...
# ask-marvin
a
@Marvin We have prefect 2.18 server and agent running in ECS. We want to auto scale the agent service based on the queue depth of the prefect server corresponding to late flow state. What is the best way to do this?
m
thought for 273.0 seconds
Great question. In Prefect 2.18 (OSS server) there isn’t a built-in CloudWatch metric for “late” runs or queue depth, so the best pattern on ECS is: - Poll Prefect for the number of Late flow runs per work queue - Publish that as a custom CloudWatch metric - Attach an Application Auto Scaling policy to your ECS Agent service using that metric This keeps your agent count proportional to the backlog of late runs. High-level steps 1) Collect a “Late runs per queue” metric - Use the Prefect Orchestration API to count Late flow runs filtered by work queue. - In 2.x, you can filter by
state.name == "Late"
and
work_queue_name
. - Run this on a schedule (e.g., every 30–60s) via an EventBridge rule + Lambda. 2) Publish to CloudWatch - From the Lambda, emit a custom metric like: - Namespace: Prefect - MetricName: LateFlowRuns - Dimensions: WorkQueue = <your-queue-name> 3) Configure ECS service auto scaling - Create a target-tracking or step scaling policy on your ECS Agent service that uses the custom metric. - Example: Target tracking with target value “1 LateFlowRun per agent task” or a step policy that adds tasks when LateFlowRuns >= thresholds. Example Lambda (Python) to publish Late runs per queue - Assumes Prefect Server API is reachable without auth. If you’ve added auth, attach credentials as needed. - Set env vars: PREFECT_API_URL (e.g., http://your-prefect-server:4200/api), WORK_QUEUE (e.g., "default").
Copy code
import os
import json
import time
import boto3
import requests

PREFECT_API_URL = os.environ["PREFECT_API_URL"].rstrip("/")
WORK_QUEUE = os.environ["WORK_QUEUE"]
NAMESPACE = os.environ.get("CW_NAMESPACE", "Prefect")
METRIC_NAME = os.environ.get("CW_METRIC_NAME", "LateFlowRuns")

cloudwatch = boto3.client("cloudwatch")

def _parse_count_response(resp):
    # Prefect count endpoints typically return an integer JSON,
    # but handle dict format just in case.
    data = resp.json()
    if isinstance(data, dict) and "count" in data:
        return int(data["count"])
    return int(data)

def handler(event, context):
    # Count Late runs in this work queue
    # Endpoint: POST /flow_runs/count
    # Filter payload uses Prefect 2.x filter schema
    payload = {
        "flow_runs": {
            "work_queue_name": {"any_": [WORK_QUEUE]},
            "state": {"name": {"any_": ["Late"]}}
        }
    }

    url = f"{PREFECT_API_URL}/flow_runs/count"
    resp = <http://requests.post|requests.post>(url, json=payload, timeout=10)
    resp.raise_for_status()
    count = _parse_count_response(resp)

    # Publish to CloudWatch
    cloudwatch.put_metric_data(
        Namespace=NAMESPACE,
        MetricData=[{
            "MetricName": METRIC_NAME,
            "Dimensions": [
                {"Name": "WorkQueue", "Value": WORK_QUEUE},
            ],
            "Timestamp": time.gmtime(),
            "Value": count,
            "Unit": "Count"
        }]
    )

    return {"work_queue": WORK_QUEUE, "late_count": count}
ECS Auto Scaling configuration (Agent service) - Register scalable target for DesiredCount on your Agent ECS service. - Create a Target Tracking policy on the custom metric: - Metric: Namespace=Prefect, MetricName=LateFlowRuns, Dimension WorkQueue=<your-queue> - Target value: e.g., 1 late run per agent task (tune to your environment) - Min/Max task count and cooldowns per your needs - Alternatively use Step scaling: - >= 1 late run: +1 task - >= 5 late runs: +3 tasks - Etc. Notes and tips - Why “Late”? Late runs indicate scheduled runs that missed their expected start time and haven’t started—good proxy for backlog. - If you prefer “scheduled backlog” instead, you can filter for
state.name == "Scheduled"
and
expected_start_time.before_ == now
. - Ensure you also have sufficient execution capacity. Scaling the agent increases claimers, but you still need enough ECS capacity (capacity provider/scaling) to run the ECS tasks your flows use. - If you operate multiple work queues, either:
- Publish one metric per queue and scale each queue-specific agent service independently, or - Aggregate counts and scale a shared agent pool. References - Prefect 2 AWS ECS guide: ECS integration guide - Prefect work pools/queues concepts: Work pools and queues - Using the Prefect client and filters: Client guide Open questions to tailor this for you - Do you run one agent per work queue or a shared agent for multiple queues? - Do you want to scale each queue’s agent independently, or a single shared agent service based on aggregate backlog? - How aggressive should scaling be (target per-late-run ratio, min/max tasks, cooldowns)? If you’d like, I can provide a ready-to-deploy Terraform/CloudFormation snippet for the Application Auto Scaling policy, and/or expand the Lambda to handle multiple queues in one run.