<@ULVA73B9P> I'm using a self hosted prefect 3.4.1...
# ask-marvin
j
@Marvin I'm using a self hosted prefect 3.4.14, running on ECS, with dynamic infrastructure provisioning. When we are starting a lot of sub flow runs, we sometimes see these stuck in "Pending". According to the logs it gets as far as "Retrieving ECS task definition..." but then nothing. I checked cloudtrail for possible throttling but there was no throttling on DescribeTaskDefinition. (on the other hand, I see a lot of throttling on DescribeTask) What do you think could be the problem? Why does the sub flow run gets stuck?
m
thought for 525.8 seconds
Thanks for the details — that symptom set lines up with an AWS-side bottleneck rather than a Prefect scheduler issue. What’s likely happening - When a child flow (subflow) is submitted to an ECS work pool, the ECS worker does: 1) resolve/retrieve the task definition 2) run_task 3) poll DescribeTasks a few times to get task details/links and confirm start - Under a burst of many subflow submissions, DescribeTasks can get throttled at the account/region level. Botocore will back off and retry (potentially for a long time), which makes the worker look “stuck” after “Retrieving ECS task definition…”, and the child flow run remains in Pending because the submission hasn’t completed. - The absence of throttling on DescribeTaskDefinition doesn’t rule this out — if DescribeTasks is the call getting hammered, the worker will sit in exponential backoff there. Other common contributors in these bursts - Task definition churn: if every run registers or resolves a new revision, you add more AWS API calls and can hit “too many concurrent attempts” or just more load. See similar reports: - Too Many Concurrent Attempts to Register Task Definition (ECS) - Re-registering identical task definitions hits rate limits - Capacity/IP exhaustion: if Fargate can’t allocate ENIs in your subnets, tasks sit pending and workers keep polling DescribeTasks. You’ll see ECS events about RESOURCE:ENI in CloudWatch if this is the case. What I recommend 1) Smooth the burst at the Prefect layer - Add a concurrency limit/tag for these child flows so you don’t submit hundreds at once. In Prefect 3.x, you can apply a concurrency limit to a tag and add that tag to your subflows to cap concurrent starts. Docs: Concurrency and rate limits - Reduce worker parallelism so it submits fewer ECS tasks at a time. If you can share how you start your worker, I can give you the exact flag/env to set a per-worker concurrency limit. 2) Reduce ECS API churn - Pin and reuse a task definition when possible. In the ECS work pool template, either: - set a specific
task_definition_arn
, or - use a stable
family
and “match latest revision” so the worker doesn’t register new definitions unless something actually changes - Keep
job_variables
stable across runs; changing them can force new task definition resolutions/registrations. 3) Make throttling more resilient - Set AWS retries on the worker task:
Copy code
# In your worker container/task definition
  export PREFECT_LOGGING_LEVEL=DEBUG
  export AWS_RETRY_MODE=adaptive
  export AWS_MAX_ATTEMPTS=20
This won’t eliminate throttling but will make backoff smoother and less likely to stall the submission indefinitely. - Consider asking AWS for a quota increase on DescribeTasks in the regions you use; the default per-account limit is easy to saturate during bursts. 4) Give ECS more time to start - Increase the ECS task start timeout in your ECS work pool/job template to account for slower starts under load. If you share your current work pool JSON, I can point to the exact field to adjust. 5) Check for subnet/IP capacity issues - If you’re on Fargate, confirm your subnets have enough available IPs. ECS events (CloudWatch) will show failures like RESOURCE:ENI when subnets are exhausted. How to confirm this is the issue - Enable DEBUG logs on the worker (
PREFECT_LOGGING_LEVEL=DEBUG
) and check for botocore throttling messages after the “Retrieving ECS task definition…” line. - Look at CloudTrail metrics for DescribeTasks during the burst; if they spike and show throttling, that’s your smoking gun. Useful references - ECS worker guide: Prefect ECS guide - Concurrency and rate limits: Prefect concurrency and rate limits - Related reports:
- Concurrent task definition registration - Re-registering identical task definitions under load If you can share: - how you start the worker (command/env), - your ECS work pool job template (redact ARNs if needed), - approx number of concurrent subflows, - whether you’re on Fargate or EC2 capacity, …I can suggest concrete settings (exact worker concurrency flag, pool template fields to set, and a tuned timeout) tailored to your setup.
j
@Marvin I actually managed to find the root cause. The ECS worker was not reporting healthy, so ECS scheduler killed it and started a new one. The sub flow run got as far as getting the task definition by the arn but never got to the state where it could start the task (RunTask). Is there a way to monitor for these situations? This could also happen during a rollout of a newer version of the worker. How can we retry flows that are stuck in pending and doesn't have a corresponding ECS task started?
m
thought for 363.7 seconds
Great find — that explains the behavior you saw. If the ECS worker dies between “retrieving task definition” and calling
RunTask
, the flow run remains PENDING and there’s no ECS task to observe, so nothing progresses until something explicitly requeues that run. Two parts to address this: 1) Monitor for the condition (worker health and “PENDING for too long”) 2) Automatically remediate (requeue/reschedule those runs) Monitoring options - Worker health: Workers heartbeat to the Prefect API. You can: - Create an Automation that triggers when a worker stops heartbeating (unhealthy) and sends a notification or kicks off a remediation flow (see Automations below). - In ECS, also keep CloudWatch alarms on service/task health so you catch restarts during rollouts quickly. - “PENDING for too long”: In Prefect 3, you can use Automations to react to run states or absence of state changes. For Server (self-hosted), if you prefer something more explicit and auditable, a small “watchdog” flow (below) works well and is easy to tune and test. Automated remediation: requeue stale PENDING runs The simplest and most predictable approach is a tiny watchdog deployment that runs every few minutes: - finds flow runs in PENDING - considers them “stale” if they’ve been PENDING longer than your threshold - sets their state back to SCHEDULED (now) to requeue them for workers to pick up Example (async) flow you can deploy on any runner (even a small process/EC2 worker):
Copy code
import asyncio
import pendulum
from datetime import timedelta

from prefect import flow, get_run_logger
from prefect.client.orchestration import get_client
from prefect.client.schemas import filters as f
from prefect.client.schemas.objects import StateType, StateDetails
from prefect.client.schemas.actions import StateCreate

STALE_AFTER = timedelta(minutes=10)  # tune this

def is_stale_pending(flow_run) -> bool:
    # flow_run.state.timestamp is when it entered PENDING
    if not flow_run.state or not flow_run.state.timestamp:
        return False
    age = pendulum.now("UTC") - flow_run.state.timestamp
    return age > STALE_AFTER

@flow
async def requeue_stale_pending_runs(tag_selector: list[str] | None = None, limit: int = 1000):
    logger = get_run_logger()
    async with get_client() as client:
        # Build a filter for PENDING runs; optionally narrow by tags/work queue/etc.
        state_filter = f.FlowRunFilterState(
            type=f.FlowRunFilterStateType(any_=[StateType.PENDING])
        )
        # Optional: limit to specific tags to avoid requeuing everything
        tag_filter = f.FlowRunFilterTags(all_=tag_selector) if tag_selector else None

        frs = await client.read_flow_runs(
            flow_run_filter=f.FlowRunFilter(state=state_filter, tags=tag_filter),
            limit=limit,
        )

        stale = [fr for fr in frs if is_stale_pending(fr)]

        if not stale:
            <http://logger.info|logger.info>("No stale PENDING runs found.")
            return

        <http://logger.info|logger.info>(f"Found {len(stale)} stale PENDING runs; rescheduling...")

        for fr in stale:
            try:
                # Set to SCHEDULED with scheduled_time=now so a healthy worker can pick it up
                state = StateCreate(
                    type=StateType.SCHEDULED,
                    name="Rescheduled by watchdog after worker restart",
                    state_details=StateDetails(scheduled_time=pendulum.now("UTC")),
                )
                await client.set_flow_run_state(flow_run_id=fr.id, state=state, force=True)
                <http://logger.info|logger.info>(f"Rescheduled {fr.name} ({fr.id})")
            except Exception as e:
                logger.warning(f"Could not reschedule {fr.id}: {e}")

if __name__ == "__main__":
    asyncio.run(requeue_stale_pending_runs(tag_selector=None))
How to use it - Deploy this as a tiny “requeue” deployment and schedule it every 1–5 minutes.
- If you only want to target runs from your ECS pool, pass a tag selector you already attach to those runs (e.g., the same tag your deployments use), or target a specific work queue via filter tweaks. - During worker rollouts, this will sweep up any runs that were claimed and left hanging. Automations (optional complement) - You can also create an Automation that: - Watches for worker unhealthy/heartbeat-missed events and either notifies you or runs the watchdog deployment above. - Watches for flow runs stuck in PENDING and performs a ChangeFlowRunState action to SCHEDULED. - Docs starting points: - Event/filters API reference - Automations schemas Additional rollout safeguards - For ECS rolling updates of the worker service, set a rollout policy that keeps at least one healthy worker during deploys (e.g., minimumHealthyPercent >= 100 for Fargate) so you don’t briefly go to zero capacity. - If you run multiple workers on the same work pool/queue, they’ll naturally cover for each other during restarts and reduce the window where PENDING runs need rescuing. If you share: - the tags/work queue you want this to apply to, - your preferred “stale” threshold, I can adapt the watchdog code to your exact filters and help with a quick deployment YAML.