<@ULVA73B9P> some sub-flows hung in a status of pe...
# ask-marvin
j
@Marvin some sub-flows hung in a status of pending with only the log "Worker 'ECSWorker 480bf56d-f474-410e-95e9-76fd22002d7f' submitting flow run '4ad09583-3808-4f10-9e11-11819cbb8737'". what is the best way to restart the subflow when it hangs like this? it happened for a total 50+ subflows.
m
thought for 745.3 seconds
Sorry you ran into that — a run stuck in Pending after “ECSWorker … submitting flow run …” usually means the worker handed the job to ECS, but the task never came up to start the engine (capacity/IAM/networking), or the run is waiting on a queue/concurrency condition and never progresses. Best ways to “restart” those subflows Assuming Prefect 3.x: A) If those “subflows” were launched via run_deployment (i.e., they are normal deployment-based child flow runs) - The most reliable path is: 1) Cancel the stuck run 2) Create a new run from the same deployment, preserving parameters/tags For one run (CLI):
Copy code
prefect flow-run cancel <FLOW_RUN_ID>
prefect deployment run --id <DEPLOYMENT_ID> --watch
For many runs (Python script using the API client, preserves parameters/tags):
Copy code
import asyncio
from uuid import UUID
from prefect import get_client
from prefect.states import Cancelled
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateName,
    FlowRunFilterParentFlowRunId,
    FlowRunFilterDeploymentId,
)

# Optional filters — set to strings like "7f..." if you want to constrain:
PARENT_ID = None          # the parent flow run ID to target only its children
DEPLOYMENT_ID = None      # target a specific deployment

async def main():
    async with get_client() as client:
        fr_filter = FlowRunFilter(
            state=FlowRunFilterState(
                name=FlowRunFilterStateName(any_=["PENDING"])
            ),
        )
        if PARENT_ID:
            fr_filter.parent_flow_run_id = FlowRunFilterParentFlowRunId(any_=[UUID(PARENT_ID)])
        if DEPLOYMENT_ID:
            fr_filter.deployment_id = FlowRunFilterDeploymentId(any_=[UUID(DEPLOYMENT_ID)])

        runs = await client.read_flow_runs(flow_run_filter=fr_filter, limit=2000)
        print(f"Found {len(runs)} pending runs")

        for fr in runs:
            await client.set_flow_run_state(fr.id, Cancelled(message="Bulk cancel stuck Pending run"))
            if fr.deployment_id:
                new = await client.create_flow_run_from_deployment(
                    fr.deployment_id,
                    parameters=fr.parameters or {},
                    tags=list(fr.tags or []),
                )
                print(f"Resubmitted {fr.id} -> new run {new.id}")
            else:
                print(f"Run {fr.id} has no deployment_id; cannot resubmit automatically.")

asyncio.run(main())
- If you’d like this to happen automatically next time, consider an Automation: “If a flow run is Pending for > N minutes, cancel it and RunDeployment (or set to Crashed to trigger retries).” See Automations. B) If they are true subflows (calling another @flow function directly inside a parent flow) - Those do not have deployments and cannot be restarted independently. In that case, you’ll need to restart the parent flow run or change the design to launch child work via
run_deployment
so each child can be retried/resubmitted independently. Why this happens on ECS (things to check) - ECS task never started: insufficient capacity, blocked ENI assignment, or missing IAM/ECR permissions - Network egress blocks Prefect API; the engine inside the task can’t report Running - Work queue paused or concurrency limits preventing start - Worker stopped/crashed after submission The ECS guide has a solid checklist: Prefect ECS guide. Also see the API client reference if you want to tailor the bulk script: Prefect API client. Quick clarifications to help me tailor a fix: - Were these child runs created by
run_deployment
from the parent flow, or by calling another
@flow
function? - Do you want me to adapt the bulk script to target a specific parent run, deployment, tags, or time window?
- Roughly how long do these runs sit in Pending before you intervene? If you share one affected flow run ID (and whether it has a
deployment_id
), I can provide a ready-to-run script filtered to your case.
j
@Marvin yes, this happened via child runs created by
run_deployment
from the parent flow. they are still sitting in a pending state. i have not intervened currently. this only started happening last night. yes, i assume the task hung in ecs or there was a networking issue. is it possible to set a timeout and then have it retry after like 5 minutes? do timeouts work if its in a pending state? i ideally wouldn't directly intervene given this is an automated processing with 50+ child sub flows.
m
thought for 167.0 seconds
Thanks for the details — that helps a lot. Key point about timeouts/retries in Pending - Flow/task timeouts and retry policies are enforced by the flow engine after the run starts. If a run is stuck in Pending (never reached Running), those timeouts/retries will not fire. - For Pending runs, you’ll want an orchestration-level fix: Automations or a small “watcher” job that cancels and re-submits. Recommended, no-hands solution Use an Automation that watches for child runs that stay Pending and then kicks off a retry. You have two good patterns: 1) One/few child deployments: a pure UI Automation - In the UI: Automations > Create Automation - Trigger: Resource trigger - Resource: Flow run - State is Pending - Sustained for: 5 minutes (this gives you the “wait 5 minutes” behavior) - Optional filters: Deployment = your child deployment(s), or tags that identify these child runs - Actions (in this order): 1) CancelFlowRun 2) RunDeployment (pick the same child deployment) - Save This will automatically cancel any child run that sits Pending for >5m and immediately create a fresh run. Because the trigger is “sustained 5 minutes,” there’s a built-in delay before retrying. Note: If you have many distinct child deployments, you can create one automation per deployment or use the watcher flow in option 2 below. 2) Many child deployments: a tiny “watcher” flow scheduled every minute - This flow finds Pending child runs older than 5 minutes, cancels them, and resubmits from their original deployment with the same parameters/tags. - You can deploy this watcher once and schedule it to run every minute. Example watcher flow (Prefect 3.x):
Copy code
import asyncio
from datetime import datetime, timedelta, timezone
from prefect import flow, get_client
from prefect.states import Cancelled
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateName,
    FlowRunFilterExpectedStartTime,
    FlowRunFilterDeploymentId,
    FlowRunFilterParentFlowRunId,
)

# Optional: constrain to specific parent, deployment(s), or tags
PARENT_ID = None                  # e.g., "7f..." to scope to a given parent’s children
DEPLOYMENT_IDS = None             # list of UUID strings if you want to scope specific child deployments
PENDING_FOR_MINUTES = 5

@flow
async def requeue_stuck_children():
    async with get_client() as client:
        now = datetime.now(timezone.utc)
        fr_filter = FlowRunFilter(
            state=FlowRunFilterState(name=FlowRunFilterStateName(any_=["PENDING"])),
            expected_start_time=FlowRunFilterExpectedStartTime(
                before_=now - timedelta(minutes=PENDING_FOR_MINUTES)
            ),
        )
        if PARENT_ID:
            fr_filter.parent_flow_run_id = FlowRunFilterParentFlowRunId(any_=[PARENT_ID])
        if DEPLOYMENT_IDS:
            fr_filter.deployment_id = FlowRunFilterDeploymentId(any_=DEPLOYMENT_IDS)

        runs = await client.read_flow_runs(flow_run_filter=fr_filter, limit=2000)
        print(f"Found {len(runs)} Pending runs older than {PENDING_FOR_MINUTES}m")

        for fr in runs:
            # Cancel the stuck run first to avoid duplicates
            await client.set_flow_run_state(fr.id, Cancelled(message="Auto-cancel stuck Pending run (>5m)"))
            if fr.deployment_id:
                new = await client.create_flow_run_from_deployment(
                    fr.deployment_id,
                    parameters=fr.parameters or {},
                    tags=list(fr.tags or []),
                )
                print(f"Resubmitted {fr.id} -> new run {new.id}")
            else:
                print(f"Run {fr.id} has no deployment_id; skipping resubmit.")

if __name__ == "__main__":
    asyncio.run(requeue_stuck_children())
- Deploy this watcher flow and schedule it every minute. It will handle all child deployments, no manual intervention needed. Why these runs get stuck in Pending on ECS (quick checks)
- ECS task didn’t reach RUNNING: capacity constraints, ENI assignment failure, or missing IAM/ECR permissions - Network egress blocked to the Prefect API (task can’t report Running) - Work queue paused or concurrency limits pinned - Worker crash after submission Helpful docs - Automations overview: Automations - API client (for watcher flow): Prefect API client - ECS guide/checklist: Prefect ECS guide If you tell me: - whether all children come from one deployment vs many, - and if you’d like the watcher to scope to the current parent run(s) only, I can tailor either a one-click Automation recipe or finalize the watcher code and a minimal deployment YAML for you.
j
@Marvin how do i do the sustained for 5 minutes option with the automation? i don't see that option. can provide an example of json?
m
thought for 405.0 seconds
Great question. Two important notes up front: - Timeouts and retries don’t fire while a run is Pending, so you need an orchestration-level “wait N minutes, then do X” rule. - Whether you see “Sustained for” in the UI depends on environment/version. It’s available in Prefect Cloud. If you’re running OSS and don’t see it, you can still achieve the behavior with an Automation or watcher flow (see below). Quick check: Are you on Prefect Cloud or OSS, and which Prefect 3.x version? That determines if the UI exposes “Sustained for”. If you have “Sustained for” in the UI - Create an Automation with: - Trigger: Resource trigger, Resource = Flow run, State is Pending, Sustained for = 5 minutes - Actions: CancelFlowRun, then RunDeployment (select the same child deployment) Example JSON (shape to illustrate the fields — use “prefect automation inspect --json <name>” to confirm exact schema in your env):
Copy code
{
  "name": "Requeue Pending Child Runs After 5m",
  "description": "Cancel and re-run child runs that stay Pending for 5 minutes",
  "enabled": true,
  "posture": "Reactive",
  "triggers": [
    {
      "type": "resource",
      "resource": "prefect.flow-run",
      "match": {
        "state": { "name": ["PENDING"] },
        "deployment_id": ["00000000-0000-0000-0000-000000000000"]  // optional: scope to a child deployment
        // or use tags to scope, e.g. "tags": ["child-run"]
      },
      "sustained_for_seconds": 300
    }
  ],
  "actions": [
    { "type": "CancelFlowRun" },
    { "type": "RunDeployment", "deployment_id": "00000000-0000-0000-0000-000000000000" }
  ]
}
- After creating the automation in the UI, you can export the exact JSON you created with:
Copy code
prefect automation inspect "Requeue Pending Child Runs After 5m" --json
If you don’t see “Sustained for” (OSS or older UI) - Easiest, reliable workaround is the watcher flow I shared earlier: schedule it every minute to find Pending child runs older than 5 minutes, cancel, and re-submit them from the same deployment with the same parameters/tags. This preserves parameters automatically, which the generic RunDeployment automation cannot do by itself. Links - Automations overview: Automations - API client (for watcher flow): API client If you confirm Cloud vs OSS and whether you want this scoped to specific deployments/tags, I can: - Provide a finalized JSON payload that matches your environment, or - Finalize the watcher flow and a minimal deployment recipe so you can drop it in and go.
j
@Marvin I'm on OSS but plan on directly creating the automation via the prefect python client not via the ui or ideally using the prefect events client. i was also curious about whether we could expose some health check in the ecs tasks so that if they aren't healthy, they can be killed.
m
🔄 Researching your answer... this may take a while 🔧 Using tool:
display_callable_signature
📊 Tools used: •
display_callable_signature
(1x)
j
@Marvin reposting as i think there was a problem: I'm on OSS but plan on directly creating the automation via the prefect python client not via the ui or ideally using the prefect events client. i was also curious about whether we could expose some health check in the ecs tasks so that if they aren't healthy, they can be killed.
m
thought for 304.4 seconds
Thanks for the context! Assuming you’re on Prefect 3.x OSS, you can absolutely create automations programmatically without the UI. You’ve got two good building blocks: - The orchestration client to create/update/delete automations - The events API to emit custom events your automations can react to Below is a concise overview, examples, and options for ECS task health checks. Programmatically creating an automation (no UI) - Define a trigger (e.g., EventTrigger listening for Prefect or custom events) - Define one or more actions (RunDeployment, CancelFlowRun, SendNotification, CallWebhook, etc.) - Create the automation via the orchestration client At a high level, you’ll do something like:
Copy code
# high-level shape (async)
from prefect.client.orchestration import get_client
from prefect.events.schemas.automations import EventTrigger, Automation
from prefect.events.actions import CancelFlowRun, SendNotification  # and others

trigger = EventTrigger(
    # configure your event filter here (e.g., by event name, resource, payload, etc.)
    # example conceptually: match events you emit like "my.company.ecs.task.unhealthy"
)

actions = [
    CancelFlowRun(),  # or RunDeployment(...), CallWebhook(...), SendNotification(...)
]

automation = Automation(
    name="Kill ECS task on unhealthy",
    trigger=trigger,
    actions=actions,
    enabled=True,
)

async with get_client() as client:
    created = await client.create_automation(automation)
    print(created.id)
Then, emit a matching custom event from your code/infrastructure:
Copy code
from prefect.events import emit_event

emit_event(
    event="my.company.ecs.task.unhealthy",
    resource={"prefect.resource.id": "ecs/task/arn:aws:ecs:...:task/abc123"},
    payload={"reason": "container healthcheck failed", "task_arn": "..."},
)
Useful references: - Automations API reference: Automations client API - Automations schemas and actions: Automation/Trigger models and actions - Events API helpers: Events primitives If you share the exact trigger you want (e.g., a specific event name or Prefect built-in event) and the action you want to take, I can draft a concrete, working snippet with the right fields filled in. About ECS task health checks and killing unhealthy tasks - Prefect’s ECS worker launches flow runs as standalone ECS tasks (RunTask). ECS container health checks do not automatically stop standalone tasks when they turn UNHEALTHY; that automatic replacement behavior only happens when tasks are managed by an ECS Service. - Common patterns to kill unhealthy ECS tasks launched by Prefect: 1) Let your code exit on unhealthy conditions. If the container’s main process exits non-zero (e.g., your probe/watchdog detects bad state and exits), ECS will stop the task. 2) Use AWS-native automation: create an EventBridge rule for ECS Task State Change events that match UNHEALTHY and target a Lambda that calls StopTask on the task. 3) Use Prefect events and automations: - Have your health probe emit a custom Prefect event (as shown above). - Create an Automation whose action is CancelFlowRun (or CallWebhook to a handler that calls StopTask). For Prefect’s ECS worker, cancelling a flow run typically triggers the worker to stop the underlying ECS task. - If you want to add container-level health checks to your Prefect ECS tasks, define them in the task definition you reference in your ECS work pool’s base job template. Just note that for RunTask tasks this won’t auto-kill the task unless you implement one of the kill strategies above. References: - ECS guide (work pools, templates, task definitions): Prefect AWS ECS guide
Clarifying questions so I can give you an exact, ready-to-run snippet: - Which event do you want the automation to react to? A Prefect-built-in event (e.g., flow-run failed) or a custom event you’ll emit (e.g., “my.company.ecs.task.unhealthy”)? - What action should the automation take? CancelFlowRun, RunDeployment, CallWebhook, SendNotification, or something else? - How are you running in ECS today: are your long-lived workers also on ECS (as a service) and your flow runs as RunTask, or only the flow runs on ECS? - If you want AWS to kill the task automatically on UNHEALTHY, are you open to an EventBridge + Lambda StopTask approach, or do you prefer to keep it all within Prefect via events/automations? Once I have those details, I’ll tailor a concrete automation creation snippet and, if helpful, a base job template example for your ECS work pool.
j
@Marvin 1. i want to react to when a task is in pending for over 5 minutes. 2. the action should be retry the task 3. yes long lived worker 4. I would ideally have some check to see if the prefect task is running on the container (i just need a cli command to run to confirm its running)
m
🔄 Researching your answer... this may take a while 🔧 Using tool:
research_prefect_topic
📊 Tools used: •
explore_module_offerings
(1x) •
research_prefect_topic
(1x)