Josh Stone
09/25/2025, 1:18 AMMarvin
09/25/2025, 1:19 AMMarvin
09/25/2025, 1:20 AMfrom prefect import flow
@flow(retries=3, retry_delay_seconds=60)
def my_flow():
...
Docs:
- States and what CRASHED means: Prefect 3 states
- Heartbeat/zombie detection: Detect zombie flows
If you’re seeing a CRASHED run get retried without retries set, can you share how you’re running it (worker type, infrastructure like K8s/Docker) and any logs? Some orchestrators can restart containers, which can look like a retry at the process level, but Prefect won’t reschedule the run on its own.Josh Stone
09/25/2025, 1:30 AMMarvin
09/25/2025, 1:31 AMMarvin
09/25/2025, 1:37 AMPREFECT_RUNNER_HEARTBEAT_FREQUENCY
in the flow’s container environment (ECS task definition). See Detect zombie flows for details and related server-side TTL settings.
- Make the ECS task less likely to miss heartbeats
- Give the task more CPU/memory so Python isn’t paused, ensure steady network egress to the Prefect API, avoid long image pulls during runs, and check for any ECS health/restart behavior.
- ECS guide: Prefect AWS ECS guide
- If you want auto-recovery from real crashes, add retries at the flow level
- This won’t stop the transient CRASHED blip, but it ensures genuine crashes get retried:
from prefect import flow
@flow(retries=3, retry_delay_seconds=60)
def my_flow():
...
- Optional: if the transient CRASHED events are more noise than signal, consider increasing the crash-detection tolerance per the docs (or, as a last resort, disabling the zombie detector in environments where it’s not useful). See Detect zombie flows.
If you can share a flow run ID and the ECS task/worker logs around the time it flips to CRASHED, I can help pinpoint whether this is heartbeat timing, resource starvation, or something else. Also helpful: Prefect version, prefect-aws version, ECS launch type (Fargate vs EC2), and your current heartbeat/env settings.Josh Stone
09/26/2025, 1:43 AMMarvin
09/26/2025, 1:43 AMMarvin
09/26/2025, 1:47 AMfrom datetime import datetime, timezone
from prefect import get_client
from prefect.states import Scheduled
flow_run_id = "your-flow-run-id"
async def reschedule_crashed_run():
async with get_client() as client:
state = Scheduled(scheduled_time=datetime.now(timezone.utc))
# force=True bypasses orchestration rules that would reject changing a terminal state
await client.set_flow_run_state(flow_run_id=flow_run_id, state=state, force=True)
Considerations before doing this
- Auditability: Keeping the same run ID can blur the history. “Rerun” preserves a clean audit trail by creating a new run.
- Orchestration: Manually rescheduling a terminal run may trigger automations again and can interact with concurrency limits and “Late” handling.
- Side effects: Tasks that already ran may have produced side effects. Rescheduling doesn’t “roll back” anything—make sure your flow is idempotent or safe to re-execute.
- Tasks within the run: Resetting the flow run doesn’t reset prior task run states; the engine will schedule new task attempts as it proceeds.
Alternatives/best practices
- Prefer “Rerun” in the UI to create a clean new run.
- Use retries where appropriate (usually at the task level). For whole-flow resiliency, consider an Automation that triggers a new run when a run crashes or fails.
Relevant docs
- REST API: set flow run state Set flow run state
If you share what caused the crash and whether you need to preserve the same run ID, I can recommend the safest path (manual reschedule vs. rerun vs. automation).Josh Stone
09/26/2025, 1:52 AMMarvin
09/26/2025, 1:52 AMMarvin
09/26/2025, 1:54 AMflow_run_id
. The run transitions through states like AwaitingRetry
and then runs another attempt under the same run.
- “Rerun” from the UI creates a brand-new flow run (new flow_run_id
).
- Failed vs. Crashed
- Failed: Flow-level retries will kick in automatically and keep the same run_id.
- Crashed: Retries generally do not trigger automatically because the process died unexpectedly. To recover from crashes, use an Automation to run the deployment again (new run_id) or explicitly change the crashed run back to Scheduled
if you need to reuse the same run (with the caveats we discussed earlier).
- Automations
- Yes — Automations can watch for both Failed and Crashed flow-run states. Configure a trigger on those states and have the action be “Run deployment” (new run_id) or “Change flow run state” (e.g., set to Scheduled) if you need to reuse the same run.
Helpful links
- States overview: States in Prefect
- Retries how-to: Retries
- Set flow run state (API): Set flow run state
If you’d like, I can share a quick Automation recipe to “Run deployment” on Crashed/Failed, or to forcibly reschedule the same run.Josh Stone
09/26/2025, 3:30 AMMarvin
09/26/2025, 3:30 AMMarvin
09/26/2025, 3:33 AMretries=
handle failures (exceptions in user code), not infrastructure crashes.
Options you can use:
- Manual retry: Click “Retry” on the flow run in the UI. This reschedules the same run.
- Automation-based auto-retry (recommended): Create a reactive Automation that listens for Crashed
and sets the same run back to Scheduled
. Example:
from datetime import timedelta
from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger, Posture
from prefect.events.schemas.events import ResourceSpecification
from prefect.events.actions import ChangeFlowRunState
from prefect.client.schemas.objects import StateType
auto_retry_crashed = Automation(
name="Auto-retry crashed flow runs",
trigger=EventTrigger(
after=set(), # reactive
expect={"prefect.flow-run.Crashed"},
match=ResourceSpecification({"prefect.resource.id": ["prefect.flow-run.*"]}),
for_each={"prefect.resource.id"},
posture=Posture.Reactive,
threshold=1,
within=timedelta(seconds=0),
),
actions=[
ChangeFlowRunState(
name="Reschedule crashed run",
state=StateType.SCHEDULED,
message="Auto-rescheduling crashed run",
),
],
)
await auto_retry_crashed.acreate()
Important notes:
- This will retry crashed runs, but can loop if the crash is deterministic. Consider adding a second guard automation (e.g., if a run crashes N times within M minutes, notify or pause) or encode a cap in your flow with get_run_context()
and the run count.
- There’s no built-in “retry crashed runs N times” cap today; that feature has been requested before: Support retrying flows on crashed state.
- If you’re using push work pools and see “Retry” hang in “Awaiting Retry,” there’s a known issue affecting some setups: Unable to retry flow using UI on push pool.
A couple of clarifying questions so I can tailor this:
- Are you on Prefect 3.x?
- Do you want to retry the same run (preserves caching/context) or create a brand-new run?
- Do you want a hard cap on the number of crash retries? If so, how many?