<@ULVA73B9P> does prefect retry a crashed flow eve...
# ask-marvin
j
@Marvin does prefect retry a crashed flow even if its not configured any retries (default config)?
m
thought for 78.8 seconds
Short answer: No. In Prefect 3.x, a flow run in the CRASHED state will not be retried unless you’ve explicitly configured retries. Workers do not automatically resubmit CRASHED runs by default. Helpful notes: - FAILED vs CRASHED: FAILED usually means your code raised an error; CRASHED typically indicates an infrastructure/runtime issue (process died, OOM, lost heartbeat). Without retries, both are terminal. - Heartbeats: Prefect can detect “zombie” runs (lost heartbeats) and mark them CRASHED, but it won’t reschedule them unless retries are configured or you manually rerun. To enable automatic retries for crashes/failures:
Copy code
from prefect import flow

@flow(retries=3, retry_delay_seconds=60)
def my_flow():
    ...
Docs: - States and what CRASHED means: Prefect 3 states - Heartbeat/zombie detection: Detect zombie flows If you’re seeing a CRASHED run get retried without retries set, can you share how you’re running it (worker type, infrastructure like K8s/Docker) and any logs? Some orchestrators can restart containers, which can look like a retry at the process level, but Prefect won’t reschedule the run on its own.
j
@Marvin i'm running it via ecs worker using run_deployment. it seems its converted into a crashed state but then it continues until its marked as completed. how should i handle that?
m
thought for 386.5 seconds
What you’re seeing can happen with ECS: the flow process keeps running, but Prefect marks the run CRASHED when it stops receiving heartbeats (e.g., brief network hiccup, CPU starvation, or a long pause). When the process eventually finishes and reports its terminal state, Prefect accepts it and the run ends as COMPLETED. It’s not a retry; it’s a temporary “zombie” detection false-positive. Ways to handle it: - Reduce false positives by tuning heartbeat settings - Send heartbeats more reliably from the run: set
PREFECT_RUNNER_HEARTBEAT_FREQUENCY
in the flow’s container environment (ECS task definition). See Detect zombie flows for details and related server-side TTL settings. - Make the ECS task less likely to miss heartbeats - Give the task more CPU/memory so Python isn’t paused, ensure steady network egress to the Prefect API, avoid long image pulls during runs, and check for any ECS health/restart behavior. - ECS guide: Prefect AWS ECS guide - If you want auto-recovery from real crashes, add retries at the flow level - This won’t stop the transient CRASHED blip, but it ensures genuine crashes get retried:
Copy code
from prefect import flow

  @flow(retries=3, retry_delay_seconds=60)
  def my_flow():
      ...
- Optional: if the transient CRASHED events are more noise than signal, consider increasing the crash-detection tolerance per the docs (or, as a last resort, disabling the zombie detector in environments where it’s not useful). See Detect zombie flows. If you can share a flow run ID and the ECS task/worker logs around the time it flips to CRASHED, I can help pinpoint whether this is heartbeat timing, resource starvation, or something else. Also helpful: Prefect version, prefect-aws version, ECS launch type (Fargate vs EC2), and your current heartbeat/env settings.
j
@Marvin is it ok to reset the crashed flow back to scheduled state as a solution?
m
thought for 252.2 seconds
Short answer: It’s possible, but it’s usually better to “Rerun” (create a new run) or rely on retries/automations. Manually resetting a crashed run to Scheduled can work in specific cases, but it has caveats. What’s supported - UI: The typical UI action is “Rerun,” which creates a new flow run. The UI does not generally offer “set to Scheduled” on a crashed run. - API: You can programmatically set a flow run’s state (including back to Scheduled) with the client. This may require forcing the transition. Example (Prefect 3.x):
Copy code
from datetime import datetime, timezone
from prefect import get_client
from prefect.states import Scheduled

flow_run_id = "your-flow-run-id"

async def reschedule_crashed_run():
    async with get_client() as client:
        state = Scheduled(scheduled_time=datetime.now(timezone.utc))
        # force=True bypasses orchestration rules that would reject changing a terminal state
        await client.set_flow_run_state(flow_run_id=flow_run_id, state=state, force=True)
Considerations before doing this - Auditability: Keeping the same run ID can blur the history. “Rerun” preserves a clean audit trail by creating a new run. - Orchestration: Manually rescheduling a terminal run may trigger automations again and can interact with concurrency limits and “Late” handling. - Side effects: Tasks that already ran may have produced side effects. Rescheduling doesn’t “roll back” anything—make sure your flow is idempotent or safe to re-execute. - Tasks within the run: Resetting the flow run doesn’t reset prior task run states; the engine will schedule new task attempts as it proceeds. Alternatives/best practices - Prefer “Rerun” in the UI to create a clean new run. - Use retries where appropriate (usually at the task level). For whole-flow resiliency, consider an Automation that triggers a new run when a run crashes or fails. Relevant docs - REST API: set flow run state Set flow run state If you share what caused the crash and whether you need to preserve the same run ID, I can recommend the safest path (manual reschedule vs. rerun vs. automation).
j
@Marvin will it a flow re-run with the same run_id or a new one if its configured and it hits a crashed or failed state? will an automation pick up on this crashed or failed state?
m
thought for 100.6 seconds
Great question — here’s how it works in Prefect 3.x: - Flow retries and run_id - If you configure flow-level retries and the flow fails, Prefect will reuse the same
flow_run_id
. The run transitions through states like
AwaitingRetry
and then runs another attempt under the same run. - “Rerun” from the UI creates a brand-new flow run (new
flow_run_id
). - Failed vs. Crashed - Failed: Flow-level retries will kick in automatically and keep the same run_id. - Crashed: Retries generally do not trigger automatically because the process died unexpectedly. To recover from crashes, use an Automation to run the deployment again (new run_id) or explicitly change the crashed run back to
Scheduled
if you need to reuse the same run (with the caveats we discussed earlier). - Automations - Yes — Automations can watch for both Failed and Crashed flow-run states. Configure a trigger on those states and have the action be “Run deployment” (new run_id) or “Change flow run state” (e.g., set to Scheduled) if you need to reuse the same run. Helpful links - States overview: States in Prefect - Retries how-to: Retries - Set flow run state (API): Set flow run state If you’d like, I can share a quick Automation recipe to “Run deployment” on Crashed/Failed, or to forcibly reschedule the same run.
j
@Marvin is there a way to change some settings to retry crashed flows?
m
thought for 176.8 seconds
Short answer: there isn’t a single “setting” that makes Prefect automatically retry a run that has entered the Crashed state. In Prefect 3.x, flow/task
retries=
handle failures (exceptions in user code), not infrastructure crashes. Options you can use: - Manual retry: Click “Retry” on the flow run in the UI. This reschedules the same run. - Automation-based auto-retry (recommended): Create a reactive Automation that listens for
Crashed
and sets the same run back to
Scheduled
. Example:
Copy code
from datetime import timedelta
from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger, Posture
from prefect.events.schemas.events import ResourceSpecification
from prefect.events.actions import ChangeFlowRunState
from prefect.client.schemas.objects import StateType

auto_retry_crashed = Automation(
    name="Auto-retry crashed flow runs",
    trigger=EventTrigger(
        after=set(),  # reactive
        expect={"prefect.flow-run.Crashed"},
        match=ResourceSpecification({"prefect.resource.id": ["prefect.flow-run.*"]}),
        for_each={"prefect.resource.id"},
        posture=Posture.Reactive,
        threshold=1,
        within=timedelta(seconds=0),
    ),
    actions=[
        ChangeFlowRunState(
            name="Reschedule crashed run",
            state=StateType.SCHEDULED,
            message="Auto-rescheduling crashed run",
        ),
    ],
)

await auto_retry_crashed.acreate()
Important notes: - This will retry crashed runs, but can loop if the crash is deterministic. Consider adding a second guard automation (e.g., if a run crashes N times within M minutes, notify or pause) or encode a cap in your flow with
get_run_context()
and the run count. - There’s no built-in “retry crashed runs N times” cap today; that feature has been requested before: Support retrying flows on crashed state. - If you’re using push work pools and see “Retry” hang in “Awaiting Retry,” there’s a known issue affecting some setups: Unable to retry flow using UI on push pool. A couple of clarifying questions so I can tailor this: - Are you on Prefect 3.x? - Do you want to retry the same run (preserves caching/context) or create a brand-new run? - Do you want a hard cap on the number of crash retries? If so, how many?