Abrar Hussain
09/19/2025, 8:26 PMMarvin
09/19/2025, 8:27 PMMarvin
09/19/2025, 8:32 PMfrom datetime import datetime, timezone
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterStateType,
FlowRunFilterWorkQueueName,
FlowRunFilterExpectedStartTime,
)
from prefect.client.schemas.objects import StateType
import boto3
WORK_QUEUE = "my-queue"
REGION = "us-east-1"
now = datetime.now(timezone.utc)
with SyncPrefectClient() as client:
scheduled_due = client.count_flow_runs(
flow_runs=FlowRunFilter(
state_type=FlowRunFilterStateType(any_=[StateType.SCHEDULED]),
work_queue_name=FlowRunFilterWorkQueueName(any_=[WORK_QUEUE]),
expected_start_time=FlowRunFilterExpectedStartTime(before_=now),
)
)
pending = client.count_flow_runs(
flow_runs=FlowRunFilter(
state_type=FlowRunFilterStateType(any_=[StateType.PENDING]),
work_queue_name=FlowRunFilterWorkQueueName(any_=[WORK_QUEUE]),
)
)
backlog = scheduled_due + pending # or just scheduled_due, depending on your policy
cw = boto3.client("cloudwatch", region_name=REGION)
cw.put_metric_data(
Namespace="Prefect",
MetricData=[
{
"MetricName": "QueueBacklog",
"Dimensions": [{"Name": "WorkQueue", "Value": WORK_QUEUE}],
"Timestamp": datetime.utcnow(),
"Value": backlog,
"Unit": "Count",
}
],
)
Then configure an ECS Application Auto Scaling policy to target this CloudWatch metric for your worker service.
References:
- Prefect 2 states: States in Prefect 2
- Work pools/queues and workers: Work pools and workers
- Agents overview (historical in 2.x): Agents
If you can share:
- Whether you’re using agents or workers
- The exact work pool/queue(s) you want to scale on
- ECS Service setup (Fargate vs EC2)
…I can tailor the filter logic and the CloudWatch policy details to your setup.