<@ULVA73B9P> I'm seeing intermittent flow crashes ...
# marvin-ai
h
@Marvin I'm seeing intermittent flow crashes when the Azure worker is unable to create the VM for flow runs. Prefect flow logs below, no further reasons in the Prefect worker or readily ascertainable in Azure resource group activity logs as to why the VM deployment is failing. Irrespective of the root cause, how can I configure an automation to retry a failed flow (for any reason, not just these)?
Copy code
Worker 'AzureContainerWorker 772c6685-1001-4890-aa27-a39963e47a32' submitting flow run '068c47c3-42ba-75b7-8000-93100a2f5951'
08:59:54 PM
Info
Running on worker id: 7323af0c-f52b-40e4-b4c2-fd069659624c. See worker logs here: <https://app.prefect.cloud/account/9dfdc247-ec96-4699-b4e9-27d5f742ad5e/workspace/aa414159-cec6-4752-87e5-8ff294b446cc/work-pools/work-pool/una-acp-prod-prefect-3-3-4-workpool-hybrid/worker/7323af0c-f52b-40e4-b4c2-fd069659624c>
08:59:54 PM
Info
Failed to submit flow run '068c47c3-42ba-75b7-8000-93100a2f5951' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 1271, in _submit_run_and_capture_errors
    result = await self.run(
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect_azure/workers/container_instance.py", line 586, in run
    created_container_group = await self._provision_container_group(
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect_azure/workers/container_instance.py", line 712, in _provision_container_group
    created_container_group = await run_sync_in_worker_thread(
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
    result = await anyio.to_thread.run_sync(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2470, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
    return call()
           ^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect_azure/workers/container_instance.py", line 658, in _wait_for_task_container_start
    raise RuntimeError(
RuntimeError: Timed out after 300.00896286964417s while watching waiting for container start.
09:05:00 PM
Error
Reported flow run '068c47c3-42ba-75b7-8000-93100a2f5951' as crashed: Flow run could not be submitted to infrastructure:
RuntimeError('Timed out after 300.00896286964417s while watching waiting for container start.')
m
thought for 544.1 seconds
Thanks for the detailed logs — that timeout is happening before your flow code starts, so flow-level retries won’t apply. In Prefect 3.x, the right way to “retry for any reason” (including infra provisioning failures that lead to Crashed or Failed states) is to add an Automation that reacts to those states and resubmits the run. Here are two good patterns you can use. Option A: Reschedule the same flow run (simple “retry” of the same run) - What it does: When a flow run enters Failed or Crashed, change its state back to Scheduled (optionally after a delay). The worker will pick it up again; the run’s run_count increments. - How to set it up in the UI: 1) Go to Automations → New Automation. 2) Trigger: “When a flow run enters a state” → select Failed and Crashed. 3) Scope: Optionally narrow by deployment, tags, or work pool so it only affects the runs you want. 4) Action: “Change flow run state” → set to Scheduled. Add a Delay (e.g., 120 seconds) if you want a cool-down before retry. 5) Limit retries: Add a condition on the flow run’s attempt count/run count (e.g., only when run_count < 3) so you don’t loop forever. - Notes: - This approach retries the same run ID. - Works well for infra-timeouts because the run didn’t start executing your code yet. Option B: Create a fresh flow run of the same deployment - What it does: When a flow run enters Failed or Crashed, create a new run for the same deployment (copying parameters/tags). - How to set it up in the UI: 1) Automations → New Automation. 2) Trigger: “When a flow run enters a state” → Failed and Crashed. 3) Scope: Optionally narrow by deployment/tags/work pool. 4) Action: “Run deployment” → choose “from the triggering flow run’s deployment” and “copy parameters from the triggering flow run.” Add a Delay if desired. 5) Limit retries: Add a condition (e.g., only when run_count < 3) or a tag-based safeguard so you don’t create unbounded runs. - Notes: - This makes a brand-new flow run each time (some teams prefer this for cleaner lineage or if they don’t want to mutate terminal states). About limiting attempts and backoff - Simple cap: Use a condition that checks run_count (e.g., only retry while run_count < 3). - Backoff: Create multiple automations keyed on run_count with different Delays (e.g., for run_count = 0 delay 1m, for run_count = 1 delay 5m, for run_count = 2 delay 15m). This gives you an exponential-like backoff. - If you’d like help crafting the exact conditions for your workspace, I can suggest a concrete configuration once you share how many retries and what backoff you want. CLI you can use to manage automations (verified) - List automations
Copy code
prefect automation ls
- Inspect an automation
Copy code
prefect automation inspect <AUTOMATION_ID>
- Pause/resume
Copy code
prefect automation pause <AUTOMATION_ID>
prefect automation resume <AUTOMATION_ID>
Note: Creation/editing is easiest in the Cloud UI today. Docs and references - States in Prefect 3.x (for Failed vs Crashed): States - CLI reference for automations: Automation CLI - REST API (server) to create automations if you prefer API: Create automation - Flow run state API (background on rescheduling a run): Set flow run state A couple of extra tips (optional): - Since your error shows a 300s timeout waiting for ACI to start, consider increasing the start timeout in your Azure work pool’s job variables if VM/container startup is sometimes slow in your region. That can reduce the number of spurious Crashed runs. - If you’re already using flow-level retries (e.g.,
@flow(retries=...)
), keep them — they handle code-level failures once the engine starts. The automation handles pre-start infra problems.
Want me to propose exact UI settings for, say, “retry up to 3 times with delays of 1m, 5m, 15m” for your specific deployment/work pool?
h
@Marvin prefer option a, although there is no delay option. are you sure this will trigger a rerun and not just change the state?
m
thought for 52.7 seconds
Great question — yes, setting a flow run’s state back to Scheduled will cause Prefect to try running it again. Workers poll for scheduled runs; when you change a terminal state (Failed/Crashed) to Scheduled, the run is put back in the queue and the worker will attempt submission again. You’ll see the run’s
run_count
increment on each retry. On delay: you’re right — the Cloud UI’s “Change flow run state → Scheduled” action doesn’t currently expose a delay field, so it reschedules immediately. If you need a cool-down, here are options: - Keep Option A and use a simple cap (e.g., only reschedule while
run_count < 3
) to avoid infinite loops. - If you must have a delay, switch to Option B (“Run deployment”) for this automation or use a small programmatic automation (API/webhook) that sets the Scheduled time in the future. References - States (how Scheduled is handled): States - Automation CLI (manage your automations): Automation CLI If you’d like, I can propose the exact Automation config for Option A with a
run_count
cap (e.g., retry up to 3 times) and scope it to your deployment/work pool.
h
@Marvin give me the json trigger that retries up to 3 times
m
thought for 274.7 seconds
Short answer: yes—changing a flow run back to Scheduled will cause it to be picked up and executed again, and the run_count will increment. Below is a JSON example for an Automation that retries the same run up to 3 attempts when it enters Failed or Crashed. Note: field names for events/filters can vary by version; the most reliable workflow is to create it in the UI, then run
prefect automation inspect <ID>
to see the canonical JSON for your workspace. Use this as a starting point:
Copy code
{
  "name": "Retry flow run up to 3 times on Failed/Crashed",
  "description": "Reschedule the same run when it fails or crashes, up to 3 attempts.",
  "enabled": true,
  "triggers": [
    {
      "type": "event",
      "posture": "Reactive",
      "expect": [
        "prefect.flow-run.Failed",
        "prefect.flow-run.Crashed"
      ],
      "match": {
        "resource": {
          "prefect.resource.role": "prefect.flow-run"
        }
      },
      "for_each": "event.resource.id",
      "filters": {
        "data": [
          {
            "path": "event.payload.flow_run.run_count",
            "op": "<",
            "value": 3
          }
        ]
      }
    }
  ],
  "actions": [
    {
      "type": "change-flow-run-state",
      "state": {
        "type": "SCHEDULED",
        "name": "Scheduled"
      }
    }
  ]
}
Tips to finalize it in your environment: - Build it once in the UI (Trigger: “When a flow run enters Failed or Crashed”; Condition: “Run count < 3”; Action: “Change flow run state → Scheduled”). - Then run:
Copy code
prefect automation ls
prefect automation inspect <AUTOMATION_ID>
to capture the exact JSON your workspace uses. Refs: - States: States - Automation CLI: Automation CLI - Create automation (API): Create automation
h
@Marvin in the UI it's currently
Copy code
{
  "type": "event",
  "match": {
    "prefect.resource.id": "prefect.flow-run.*"
  },
  "match_related": {},
  "after": [],
  "expect": [
    "prefect.flow-run.Crashed"
  ],
  "for_each": [
    "prefect.resource.id"
  ],
  "posture": "Reactive",
  "threshold": 1,
  "within": 0
}
so I replace threshold/within with:
Copy code
"filters": {
        "data": [
          {
            "path": "event.payload.flow_run.run_count",
            "op": "<",
            "value": 3
          }
        ]
      }
m
thought for 22.0 seconds
Don’t remove threshold/within — those are independent. To cap retries by run_count, add a data filter to your existing event trigger. Also, use the literal "<" in JSON (not the HTML entity “<”). Here’s your trigger JSON with the run_count filter added:
Copy code
{
  "type": "event",
  "match": {
    "prefect.resource.id": "prefect.flow-run.*"
  },
  "match_related": {},
  "after": [],
  "expect": [
    "prefect.flow-run.Crashed"
    // you can also include "prefect.flow-run.Failed" here if desired
  ],
  "for_each": [
    "prefect.resource.id"
  ],
  "posture": "Reactive",
  "threshold": 1,
  "within": 0,
  "filters": {
    "data": [
      {
        "path": "payload.flow_run.run_count",
        "op": "<",
        "value": 3
      }
    ]
  }
}
Notes: - The path should be payload.flow_run.run_count (relative to the event payload). - Keep threshold: 1 and within: 0 unless you want rate-based gating in addition to the run_count condition. - This trigger alone doesn’t retry — pair it with the “Change flow run state → Scheduled” action in the automation to actually re-run the same flow run.
h
@Marvin you're imaging a filters key
m
thought for 62.6 seconds
You’re right — I was wrong to suggest a filters key in your Event trigger. The current Event trigger JSON in the UI does not support payload filters like run_count; so you can’t encode “up to 3 retries” purely in that trigger JSON. What you can do today for Option A: - Keep your Event trigger as-is (Crashed or Failed). - Add an action: Change flow run state → Scheduled. - That will re-run the same flow run, but without a built-in cap. If you must cap to 3 retries with Option A, use a small webhook guard: - Automation - Trigger: your Event trigger (Crashed/Failed). - Action: Call webhook (your endpoint). - Webhook logic - Read the flow run (from event.resource.id). - If run_count < 3, call Prefect’s Set Flow Run State API to set SCHEDULED; else do nothing. Example webhook sketch (serverless or small service):
Copy code
# Receives Prefect event webhook
# Expects JSON body containing event with resource.id and payload.flow_run.run_count
import os
import requests
from flask import Flask, request, jsonify

PREFECT_API_URL = os.environ["PREFECT_API_URL"]      # e.g. <https://api.prefect.cloud/api/accounts/<id>/workspaces/<id>>
PREFECT_API_KEY = os.environ["PREFECT_API_KEY"]

app = Flask(__name__)

@app.post("/prefect-retry-guard")
def prefect_retry_guard():
    evt = request.get_json()
    flow_run_id = evt["resource"]["id"].split(".")[-1]  # prefect.flow-run.<uuid>
    run_count = evt["payload"]["flow_run"]["run_count"]
    if run_count < 3:
        url = f"{PREFECT_API_URL}/flow_runs/{flow_run_id}/set_state"
        headers = {"Authorization": f"Bearer {PREFECT_API_KEY}"}
        body = {"state": {"type": "SCHEDULED", "name": "Scheduled"}}
        r = <http://requests.post|requests.post>(url, headers=headers, json=body, timeout=10)
        r.raise_for_status()
        return jsonify({"rescheduled": True}), 200
    return jsonify({"rescheduled": False, "reason": "retry cap reached"}), 200
References - States: States - Set flow run state (API): Set flow run state - Automation CLI (inspect JSON after creating in UI): Automation CLI If you prefer not to run a webhook, we can also explore a Composite/Metric trigger approach, but support for run_count as a first-class condition varies and I’d want to confirm what’s enabled in your workspace before sending JSON.