Janos Krnak
09/25/2025, 8:36 AMMarvin
09/25/2025, 8:36 AMMarvin
09/25/2025, 8:45 AMtask_definition_arn
, or
- use a stable family
and “match latest revision” so the worker doesn’t register new definitions unless something actually changes
- Keep job_variables
stable across runs; changing them can force new task definition resolutions/registrations.
3) Make throttling more resilient
- Set AWS retries on the worker task:
# In your worker container/task definition
export PREFECT_LOGGING_LEVEL=DEBUG
export AWS_RETRY_MODE=adaptive
export AWS_MAX_ATTEMPTS=20
This won’t eliminate throttling but will make backoff smoother and less likely to stall the submission indefinitely.
- Consider asking AWS for a quota increase on DescribeTasks in the regions you use; the default per-account limit is easy to saturate during bursts.
4) Give ECS more time to start
- Increase the ECS task start timeout in your ECS work pool/job template to account for slower starts under load. If you share your current work pool JSON, I can point to the exact field to adjust.
5) Check for subnet/IP capacity issues
- If you’re on Fargate, confirm your subnets have enough available IPs. ECS events (CloudWatch) will show failures like RESOURCE:ENI when subnets are exhausted.
How to confirm this is the issue
- Enable DEBUG logs on the worker (PREFECT_LOGGING_LEVEL=DEBUG
) and check for botocore throttling messages after the “Retrieving ECS task definition…” line.
- Look at CloudTrail metrics for DescribeTasks during the burst; if they spike and show throttling, that’s your smoking gun.
Useful references
- ECS worker guide: Prefect ECS guide
- Concurrency and rate limits: Prefect concurrency and rate limits
- Related reports:Marvin
09/25/2025, 8:45 AMJanos Krnak
09/25/2025, 9:02 AMMarvin
09/25/2025, 9:02 AMMarvin
09/25/2025, 9:09 AMRunTask
, the flow run remains PENDING and there’s no ECS task to observe, so nothing progresses until something explicitly requeues that run.
Two parts to address this:
1) Monitor for the condition (worker health and “PENDING for too long”)
2) Automatically remediate (requeue/reschedule those runs)
Monitoring options
- Worker health: Workers heartbeat to the Prefect API. You can:
- Create an Automation that triggers when a worker stops heartbeating (unhealthy) and sends a notification or kicks off a remediation flow (see Automations below).
- In ECS, also keep CloudWatch alarms on service/task health so you catch restarts during rollouts quickly.
- “PENDING for too long”: In Prefect 3, you can use Automations to react to run states or absence of state changes. For Server (self-hosted), if you prefer something more explicit and auditable, a small “watchdog” flow (below) works well and is easy to tune and test.
Automated remediation: requeue stale PENDING runs
The simplest and most predictable approach is a tiny watchdog deployment that runs every few minutes:
- finds flow runs in PENDING
- considers them “stale” if they’ve been PENDING longer than your threshold
- sets their state back to SCHEDULED (now) to requeue them for workers to pick up
Example (async) flow you can deploy on any runner (even a small process/EC2 worker):
import asyncio
import pendulum
from datetime import timedelta
from prefect import flow, get_run_logger
from prefect.client.orchestration import get_client
from prefect.client.schemas import filters as f
from prefect.client.schemas.objects import StateType, StateDetails
from prefect.client.schemas.actions import StateCreate
STALE_AFTER = timedelta(minutes=10) # tune this
def is_stale_pending(flow_run) -> bool:
# flow_run.state.timestamp is when it entered PENDING
if not flow_run.state or not flow_run.state.timestamp:
return False
age = pendulum.now("UTC") - flow_run.state.timestamp
return age > STALE_AFTER
@flow
async def requeue_stale_pending_runs(tag_selector: list[str] | None = None, limit: int = 1000):
logger = get_run_logger()
async with get_client() as client:
# Build a filter for PENDING runs; optionally narrow by tags/work queue/etc.
state_filter = f.FlowRunFilterState(
type=f.FlowRunFilterStateType(any_=[StateType.PENDING])
)
# Optional: limit to specific tags to avoid requeuing everything
tag_filter = f.FlowRunFilterTags(all_=tag_selector) if tag_selector else None
frs = await client.read_flow_runs(
flow_run_filter=f.FlowRunFilter(state=state_filter, tags=tag_filter),
limit=limit,
)
stale = [fr for fr in frs if is_stale_pending(fr)]
if not stale:
<http://logger.info|logger.info>("No stale PENDING runs found.")
return
<http://logger.info|logger.info>(f"Found {len(stale)} stale PENDING runs; rescheduling...")
for fr in stale:
try:
# Set to SCHEDULED with scheduled_time=now so a healthy worker can pick it up
state = StateCreate(
type=StateType.SCHEDULED,
name="Rescheduled by watchdog after worker restart",
state_details=StateDetails(scheduled_time=pendulum.now("UTC")),
)
await client.set_flow_run_state(flow_run_id=fr.id, state=state, force=True)
<http://logger.info|logger.info>(f"Rescheduled {fr.name} ({fr.id})")
except Exception as e:
logger.warning(f"Could not reschedule {fr.id}: {e}")
if __name__ == "__main__":
asyncio.run(requeue_stale_pending_runs(tag_selector=None))
How to use it
- Deploy this as a tiny “requeue” deployment and schedule it every 1–5 minutes.Marvin
09/25/2025, 9:09 AM