<@ULVA73B9P> what is the best way, using prefect 3...
# ask-marvin
j
@Marvin what is the best way, using prefect 3 to automate retries on larger infrastructure if a deployment ended in a crashed state. For example retry with a 8vcpu60gb deployment if the 4vcpu60gb deployment fails.
m
thought for 256.3 seconds
Great question. In Prefect 3.x, the most robust way to “escalate” compute after a crash is to use an Automation that watches for Crashed flow runs and then triggers a new run on larger infrastructure. There are two solid patterns: 1) Two-deployment escalation (simplest) - Create two deployments for the same flow: - Small: 4 vCPU / 60 GiB - Large: 8 vCPU / 60 GiB - Create a Prefect Automation: - Trigger: Flow run state entered → Crashed - Filter: deployment name == your “small” deployment - Action: Run deployment → your “large” deployment - Optional: forward the same parameters; add a tag like
escalated
to the new run to avoid re-triggering loops. - Pros: very clear, no need to pass job variable overrides dynamically. - Cons: two deployments to keep in sync. 2) Single deployment with job variable overrides (more flexible) - Keep one deployment but let the Automation re-run it with larger resources by overriding job variables. - The underlying action is equivalent to calling
prefect.deployments.run_deployment(..., job_variables={...})
. The SDK supports this override: - Signature: Customize job variables - Run deployment API (Python): the function accepts
job_variables
and
tags
. - Pros: one deployment; dynamic sizing per retry. - Cons: you must supply the correct job variable schema for your worker type (Kubernetes, Docker, ECS). How to express “bigger” resources - Kubernetes worker: override the Pod manifest’s resources (requests/limits). Example job variables (small vs. large): Small (4 vCPU / 60Gi):
Copy code
{
  "pod": {
    "spec": {
      "containers": [
        {
          "name": "flow",
          "resources": {
            "requests": {
              "cpu": "4",
              "memory": "60Gi"
            },
            "limits": {
              "cpu": "4",
              "memory": "60Gi"
            }
          }
        }
      ]
    }
  }
}
Large (8 vCPU / 60Gi):
Copy code
{
  "pod": {
    "spec": {
      "containers": [
        {
          "name": "flow",
          "resources": {
            "requests": {
              "cpu": "8",
              "memory": "60Gi"
            },
            "limits": {
              "cpu": "8",
              "memory": "60Gi"
            }
          }
        }
      ]
    }
  }
}
- Docker/ECS workers: use the fields in each worker’s job template (e.g., Docker memory/cpu settings or ECS task CPU/memory). See: - Customize job variables - ECS worker guide Avoid loops and replays - Add a tag like
escalated
on the re-run via the automation action. In your automation trigger, filter out runs that already have
escalated
so you only escalate once. - Optionally add another Automation to notify if the “large” run also crashes. - If you see “zombie” runs (lost heartbeats), you can tune heartbeat detection: Detect zombie flows Putting it together in the UI - Create/confirm your work pool(s) and job templates for small/large or confirm the job variables schema you’ll override. - Create the “small” deployment (and optionally the “large” one if you use the two-deployment pattern). - Add an Automation: - Trigger: Flow run → State entered → Crashed - Filters: deployment == small deployment - Action: - If using two deployments: Run deployment → select the large deployment, add tags
["escalated"]
, and forward parameters if needed. - If using one deployment: Run the same deployment and provide the “large”
job_variables
override and tags
["escalated"]
. If you’d like, I can draft the exact Automation config and job variables for your setup. Which worker type are you using (Kubernetes, Docker, or ECS)? And do you prefer two deployments or a single deployment with job variable overrides?
j
how do i forward the same parameters to the retry?
@Marvin how do i forward the same parameters to the retry?
m
thought for 113.0 seconds
Two easy ways: A) In an Automation (UI) - In your Run deployment action, set the Parameters field to a Jinja template that copies the originals. - Toggle “Use template” and paste:
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ event.flow_run.parameters | tojson }}"
}
- Optionally also template the name/tags to mark the retry:
Copy code
{
  "flow_run_name": {
    "__prefect_kind": "jinja",
    "template": "{{ event.flow_run.name }} (escalated)"
  },
  "tags": [
    "escalated"
  ]
}
Docs: Templating in Automations B) From code (if you’re programmatically re-running)
Copy code
from prefect.deployments import run_deployment
from prefect.runtime import flow_run

await run_deployment(
    name="my-flow/large",                # or same deployment
    parameters=flow_run.parameters,      # forwards originals
    tags=[*flow_run.tags, "escalated"],  # optional guard
)
Tip: Add a filter in your Automation trigger to exclude runs with the
escalated
tag so you don’t loop retries.
j
@Marvin so here is the jinja imput to my re-run, how do i adjust this to use all the parameters from the crashed run, and rename the rerun with the tag _escalated { "hyd_refdes": { "__prefect_kind": "jinja" }, "date": { "__prefect_kind": "jinja" }, "format": { "__prefect_kind": "jinja" }, "normalize_traces": { "__prefect_kind": "jinja" }, "fudge_factor": { "__prefect_kind": "jinja" }, "write_wav": { "__prefect_kind": "jinja" }, "apply_cals": { "__prefect_kind": "jinja" }, "freq_lims": { "__prefect_kind": "jinja" }, "s3_sync": { "__prefect_kind": "jinja" }, "flag": { "__prefect_kind": "jinja" } }
m
thought for 17.5 seconds
Replace your whole Parameters input with a single Jinja template that forwards everything from the crashed run: Parameters field:
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ event.flow_run.parameters | tojson }}"
}
To rename the re-run’s flow run name with a suffix: Flow run name field:
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ event.flow_run.name }}_escalated"
}
If you also want to tag the re-run (recommended so you can filter and avoid loops): Tags field:
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ (event.flow_run.tags + ['escalated']) | tojson }}"
}
Docs: Templating in Automations
j
2
@Marvin ok my parameters looks like this now: { "__prefect_kind": "jinja", "template": "{{ event.flow_run.parameters | tojson }}" } what do I add to so that I get just a single escalated rerun?
m
thought for 32.2 seconds
Add two guards: 1) Tag the re-run as escalated (in the Action) - Tags field:
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ (event.flow_run.tags + ['escalated']) | tojson }}"
}
- (Optional) Flow run name field:
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ event.flow_run.name }}_escalated"
}
2) Exclude already-escalated runs (in the Trigger filters) - Add a filter: Flow run tags does not contain
escalated
- If you’re using a two-deployment pattern, also restrict to the “small” deployment so a crash on the large run won’t re-trigger. Alternative guard (optional): add a filter “Flow run run count equals 0” so you only escalate the first crash. Your Parameters template is already correct:
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ event.flow_run.parameters | tojson }}"
}
Docs: Templating in Automations
j
@Marvin so the parameters field should look like this { "__prefect_kind": "jinja", "template": "{{ event.flow_run.parameters | tojson }}" }, { "__prefect_kind": "jinja", "template": "{{ (event.flow_run.tags + ['escalated']) | tojson }}" }, { "__prefect_kind": "jinja", "template": "{{ event.flow_run.name }}_escalated" } or where in the automations form should I enter the flow run name and the flow run tags?
m
thought for 18.6 seconds
Short answer: those three templates go in three different fields. Don’t put them all in the Parameters field. In your Automation’s Run deployment action: - Parameters field (enable “Use template”):
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ event.flow_run.parameters | tojson }}"
}
- Flow run name field (enable “Use template”):
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ event.flow_run.name }}_escalated"
}
- Tags field (enable “Use template”):
Copy code
{
  "__prefect_kind": "jinja",
  "template": "{{ (event.flow_run.tags + ['escalated']) | tojson }}"
}
And to ensure you only get a single escalated re-run, add a Trigger filter: - Flow run → Tags → does not contain → escalated - (Optionally also filter Deployment == your “small” deployment.) Templating docs: Templating in Automations
j
can I define this all in a yaml file?
@Marvin can I define this all in a yaml file?
m
thought for 220.4 seconds
Short answer: not in prefect.yaml. Prefect 3.x doesn’t currently support defining Automations inside your deployment’s prefect.yaml, and there’s no “prefect automation create/apply-from-file” CLI. You can define them in the UI, or manage them “as code” via the Python client (or REST). A practical “YAML-as-source-of-truth” workflow: 1) Create the automation once in the UI (with your Jinja templates for parameters/name/tags). 2) Export it to YAML for version control:
Copy code
prefect automation inspect "my-escalation" --yaml > automation.yaml
3) Apply/update via a tiny Python script (CI-friendly):
Copy code
import asyncio
import yaml
from prefect.client.orchestration import get_client

async def apply_automation(path="automation.yaml"):
    with open(path) as f:
        spec = yaml.safe_load(f)

    async with get_client() as client:
        # simplest: try to create; if it already exists, delete/replace in UI or add your own upsert logic
        await client.create_automation(spec)

if __name__ == "__main__":
    asyncio.run(apply_automation())
Notes - Use the UI export (step 2) to get the exact, valid schema for your workspace. - For “single re-run” safety, include in the exported automation: - Tags template that appends
escalated
- Trigger filter “Flow run tags does not contain escalated” - Optional filter to only match your “small” deployment Helpful docs: - CLI (inspect/export): prefect automation CLI - Jinja templating in Automations: Templating in Automations If you share your deployment names (small vs large) and worker type, I can draft an example automation in YAML you can import into your repo and apply with the script above.