Josh Stone
09/19/2025, 3:15 PMMarvin
09/19/2025, 3:15 PMMarvin
09/19/2025, 3:27 PMprefect flow-run cancel <FLOW_RUN_ID>
prefect deployment run --id <DEPLOYMENT_ID> --watch
For many runs (Python script using the API client, preserves parameters/tags):
import asyncio
from uuid import UUID
from prefect import get_client
from prefect.states import Cancelled
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateName,
FlowRunFilterParentFlowRunId,
FlowRunFilterDeploymentId,
)
# Optional filters — set to strings like "7f..." if you want to constrain:
PARENT_ID = None # the parent flow run ID to target only its children
DEPLOYMENT_ID = None # target a specific deployment
async def main():
async with get_client() as client:
fr_filter = FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["PENDING"])
),
)
if PARENT_ID:
fr_filter.parent_flow_run_id = FlowRunFilterParentFlowRunId(any_=[UUID(PARENT_ID)])
if DEPLOYMENT_ID:
fr_filter.deployment_id = FlowRunFilterDeploymentId(any_=[UUID(DEPLOYMENT_ID)])
runs = await client.read_flow_runs(flow_run_filter=fr_filter, limit=2000)
print(f"Found {len(runs)} pending runs")
for fr in runs:
await client.set_flow_run_state(fr.id, Cancelled(message="Bulk cancel stuck Pending run"))
if fr.deployment_id:
new = await client.create_flow_run_from_deployment(
fr.deployment_id,
parameters=fr.parameters or {},
tags=list(fr.tags or []),
)
print(f"Resubmitted {fr.id} -> new run {new.id}")
else:
print(f"Run {fr.id} has no deployment_id; cannot resubmit automatically.")
asyncio.run(main())
- If you’d like this to happen automatically next time, consider an Automation: “If a flow run is Pending for > N minutes, cancel it and RunDeployment (or set to Crashed to trigger retries).” See Automations.
B) If they are true subflows (calling another @flow function directly inside a parent flow)
- Those do not have deployments and cannot be restarted independently. In that case, you’ll need to restart the parent flow run or change the design to launch child work via run_deployment
so each child can be retried/resubmitted independently.
Why this happens on ECS (things to check)
- ECS task never started: insufficient capacity, blocked ENI assignment, or missing IAM/ECR permissions
- Network egress blocks Prefect API; the engine inside the task can’t report Running
- Work queue paused or concurrency limits preventing start
- Worker stopped/crashed after submission
The ECS guide has a solid checklist: Prefect ECS guide. Also see the API client reference if you want to tailor the bulk script: Prefect API client.
Quick clarifications to help me tailor a fix:
- Were these child runs created by run_deployment
from the parent flow, or by calling another @flow
function?
- Do you want me to adapt the bulk script to target a specific parent run, deployment, tags, or time window?Marvin
09/19/2025, 3:27 PMdeployment_id
), I can provide a ready-to-run script filtered to your case.Josh Stone
09/19/2025, 3:39 PMrun_deployment
from the parent flow. they are still sitting in a pending state. i have not intervened currently. this only started happening last night. yes, i assume the task hung in ecs or there was a networking issue. is it possible to set a timeout and then have it retry after like 5 minutes? do timeouts work if its in a pending state? i ideally wouldn't directly intervene given this is an automated processing with 50+ child sub flows.Marvin
09/19/2025, 3:39 PMMarvin
09/19/2025, 3:41 PMimport asyncio
from datetime import datetime, timedelta, timezone
from prefect import flow, get_client
from prefect.states import Cancelled
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateName,
FlowRunFilterExpectedStartTime,
FlowRunFilterDeploymentId,
FlowRunFilterParentFlowRunId,
)
# Optional: constrain to specific parent, deployment(s), or tags
PARENT_ID = None # e.g., "7f..." to scope to a given parent’s children
DEPLOYMENT_IDS = None # list of UUID strings if you want to scope specific child deployments
PENDING_FOR_MINUTES = 5
@flow
async def requeue_stuck_children():
async with get_client() as client:
now = datetime.now(timezone.utc)
fr_filter = FlowRunFilter(
state=FlowRunFilterState(name=FlowRunFilterStateName(any_=["PENDING"])),
expected_start_time=FlowRunFilterExpectedStartTime(
before_=now - timedelta(minutes=PENDING_FOR_MINUTES)
),
)
if PARENT_ID:
fr_filter.parent_flow_run_id = FlowRunFilterParentFlowRunId(any_=[PARENT_ID])
if DEPLOYMENT_IDS:
fr_filter.deployment_id = FlowRunFilterDeploymentId(any_=DEPLOYMENT_IDS)
runs = await client.read_flow_runs(flow_run_filter=fr_filter, limit=2000)
print(f"Found {len(runs)} Pending runs older than {PENDING_FOR_MINUTES}m")
for fr in runs:
# Cancel the stuck run first to avoid duplicates
await client.set_flow_run_state(fr.id, Cancelled(message="Auto-cancel stuck Pending run (>5m)"))
if fr.deployment_id:
new = await client.create_flow_run_from_deployment(
fr.deployment_id,
parameters=fr.parameters or {},
tags=list(fr.tags or []),
)
print(f"Resubmitted {fr.id} -> new run {new.id}")
else:
print(f"Run {fr.id} has no deployment_id; skipping resubmit.")
if __name__ == "__main__":
asyncio.run(requeue_stuck_children())
- Deploy this watcher flow and schedule it every minute. It will handle all child deployments, no manual intervention needed.
Why these runs get stuck in Pending on ECS (quick checks)Marvin
09/19/2025, 3:41 PMJosh Stone
09/19/2025, 3:58 PMMarvin
09/19/2025, 3:58 PMMarvin
09/19/2025, 4:05 PM{
"name": "Requeue Pending Child Runs After 5m",
"description": "Cancel and re-run child runs that stay Pending for 5 minutes",
"enabled": true,
"posture": "Reactive",
"triggers": [
{
"type": "resource",
"resource": "prefect.flow-run",
"match": {
"state": { "name": ["PENDING"] },
"deployment_id": ["00000000-0000-0000-0000-000000000000"] // optional: scope to a child deployment
// or use tags to scope, e.g. "tags": ["child-run"]
},
"sustained_for_seconds": 300
}
],
"actions": [
{ "type": "CancelFlowRun" },
{ "type": "RunDeployment", "deployment_id": "00000000-0000-0000-0000-000000000000" }
]
}
- After creating the automation in the UI, you can export the exact JSON you created with:
prefect automation inspect "Requeue Pending Child Runs After 5m" --json
If you don’t see “Sustained for” (OSS or older UI)
- Easiest, reliable workaround is the watcher flow I shared earlier: schedule it every minute to find Pending child runs older than 5 minutes, cancel, and re-submit them from the same deployment with the same parameters/tags. This preserves parameters automatically, which the generic RunDeployment automation cannot do by itself.
Links
- Automations overview: Automations
- API client (for watcher flow): API client
If you confirm Cloud vs OSS and whether you want this scoped to specific deployments/tags, I can:
- Provide a finalized JSON payload that matches your environment, or
- Finalize the watcher flow and a minimal deployment recipe so you can drop it in and go.Josh Stone
09/19/2025, 5:42 PMMarvin
09/19/2025, 5:42 PMdisplay_callable_signature
📊 Tools used:
• display_callable_signature
(1x)Josh Stone
09/19/2025, 6:56 PMMarvin
09/19/2025, 6:56 PMMarvin
09/19/2025, 7:01 PM# high-level shape (async)
from prefect.client.orchestration import get_client
from prefect.events.schemas.automations import EventTrigger, Automation
from prefect.events.actions import CancelFlowRun, SendNotification # and others
trigger = EventTrigger(
# configure your event filter here (e.g., by event name, resource, payload, etc.)
# example conceptually: match events you emit like "my.company.ecs.task.unhealthy"
)
actions = [
CancelFlowRun(), # or RunDeployment(...), CallWebhook(...), SendNotification(...)
]
automation = Automation(
name="Kill ECS task on unhealthy",
trigger=trigger,
actions=actions,
enabled=True,
)
async with get_client() as client:
created = await client.create_automation(automation)
print(created.id)
Then, emit a matching custom event from your code/infrastructure:
from prefect.events import emit_event
emit_event(
event="my.company.ecs.task.unhealthy",
resource={"prefect.resource.id": "ecs/task/arn:aws:ecs:...:task/abc123"},
payload={"reason": "container healthcheck failed", "task_arn": "..."},
)
Useful references:
- Automations API reference: Automations client API
- Automations schemas and actions: Automation/Trigger models and actions
- Events API helpers: Events primitives
If you share the exact trigger you want (e.g., a specific event name or Prefect built-in event) and the action you want to take, I can draft a concrete, working snippet with the right fields filled in.
About ECS task health checks and killing unhealthy tasks
- Prefect’s ECS worker launches flow runs as standalone ECS tasks (RunTask). ECS container health checks do not automatically stop standalone tasks when they turn UNHEALTHY; that automatic replacement behavior only happens when tasks are managed by an ECS Service.
- Common patterns to kill unhealthy ECS tasks launched by Prefect:
1) Let your code exit on unhealthy conditions. If the container’s main process exits non-zero (e.g., your probe/watchdog detects bad state and exits), ECS will stop the task.
2) Use AWS-native automation: create an EventBridge rule for ECS Task State Change events that match UNHEALTHY and target a Lambda that calls StopTask on the task.
3) Use Prefect events and automations:
- Have your health probe emit a custom Prefect event (as shown above).
- Create an Automation whose action is CancelFlowRun (or CallWebhook to a handler that calls StopTask). For Prefect’s ECS worker, cancelling a flow run typically triggers the worker to stop the underlying ECS task.
- If you want to add container-level health checks to your Prefect ECS tasks, define them in the task definition you reference in your ECS work pool’s base job template. Just note that for RunTask tasks this won’t auto-kill the task unless you implement one of the kill strategies above.
References:
- ECS guide (work pools, templates, task definitions): Prefect AWS ECS guideMarvin
09/19/2025, 7:01 PMJosh Stone
09/19/2025, 7:06 PMMarvin
09/19/2025, 7:06 PMresearch_prefect_topic
📊 Tools used:
• explore_module_offerings
(1x)
• research_prefect_topic
(1x)