<@ULVA73B9P> Is the flow in pending state means th...
# ask-marvin
a
@Marvin Is the flow in pending state means that it is not being picked up by any agent in prefect 2? can we built ecs auto scaling based on pending state queue depth?
m
thought for 313.2 seconds
Short answer: - Pending in Prefect 2.x does not necessarily mean “no agent/worker picked it up.” Typically: - Scheduled: the run is queued to start (if it’s past its start time you’ll see it as “Late” in the UI but the state is still Scheduled). If no agent/worker is polling, runs will remain Scheduled/Late. - Pending: usually indicates a worker has claimed the run and is preparing infrastructure; it’s a brief transitional state before Running. - Running: the code is executing. So if you’re seeing lots of Scheduled/Late runs, that’s a sign nothing is picking them up. If you’re seeing Pending, it often means a worker has claimed the run and it’s about to start. Autoscaling ECS on queue depth: - Yes, you can autoscale an ECS worker service based on “queue depth.” We generally recommend scaling on runs that are due to start (Scheduled with expected start time <= now). Optionally you can include Pending if you want to scale while workers are preparing infra. How to get the queue depth programmatically (Prefect 2.x): - Use the Prefect client’s count endpoint with filters for your work queue name and state types. Example: count due Scheduled runs and Pending runs for a given work queue, then publish as a CloudWatch custom metric for ECS Application Auto Scaling:
Copy code
from datetime import datetime, timezone

from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterStateType,
    FlowRunFilterWorkQueueName,
    FlowRunFilterExpectedStartTime,
)
from prefect.client.schemas.objects import StateType

import boto3

WORK_QUEUE = "my-queue"
REGION = "us-east-1"

now = datetime.now(timezone.utc)

with SyncPrefectClient() as client:
    scheduled_due = client.count_flow_runs(
        flow_runs=FlowRunFilter(
            state_type=FlowRunFilterStateType(any_=[StateType.SCHEDULED]),
            work_queue_name=FlowRunFilterWorkQueueName(any_=[WORK_QUEUE]),
            expected_start_time=FlowRunFilterExpectedStartTime(before_=now),
        )
    )

    pending = client.count_flow_runs(
        flow_runs=FlowRunFilter(
            state_type=FlowRunFilterStateType(any_=[StateType.PENDING]),
            work_queue_name=FlowRunFilterWorkQueueName(any_=[WORK_QUEUE]),
        )
    )

backlog = scheduled_due + pending  # or just scheduled_due, depending on your policy

cw = boto3.client("cloudwatch", region_name=REGION)
cw.put_metric_data(
    Namespace="Prefect",
    MetricData=[
        {
            "MetricName": "QueueBacklog",
            "Dimensions": [{"Name": "WorkQueue", "Value": WORK_QUEUE}],
            "Timestamp": datetime.utcnow(),
            "Value": backlog,
            "Unit": "Count",
        }
    ],
)
Then configure an ECS Application Auto Scaling policy to target this CloudWatch metric for your worker service. References: - Prefect 2 states: States in Prefect 2 - Work pools/queues and workers: Work pools and workers - Agents overview (historical in 2.x): Agents If you can share: - Whether you’re using agents or workers - The exact work pool/queue(s) you want to scale on - ECS Service setup (Fargate vs EC2) …I can tailor the filter logic and the CloudWatch policy details to your setup.