Joe D
08/22/2025, 9:04 PMMarvin
08/22/2025, 9:04 PMMarvin
08/22/2025, 9:08 PMescalated
to the new run to avoid re-triggering loops.
- Pros: very clear, no need to pass job variable overrides dynamically.
- Cons: two deployments to keep in sync.
2) Single deployment with job variable overrides (more flexible)
- Keep one deployment but let the Automation re-run it with larger resources by overriding job variables.
- The underlying action is equivalent to calling prefect.deployments.run_deployment(..., job_variables={...})
. The SDK supports this override:
- Signature: Customize job variables
- Run deployment API (Python): the function accepts job_variables
and tags
.
- Pros: one deployment; dynamic sizing per retry.
- Cons: you must supply the correct job variable schema for your worker type (Kubernetes, Docker, ECS).
How to express “bigger” resources
- Kubernetes worker: override the Pod manifest’s resources (requests/limits). Example job variables (small vs. large):
Small (4 vCPU / 60Gi):
{
"pod": {
"spec": {
"containers": [
{
"name": "flow",
"resources": {
"requests": {
"cpu": "4",
"memory": "60Gi"
},
"limits": {
"cpu": "4",
"memory": "60Gi"
}
}
}
]
}
}
}
Large (8 vCPU / 60Gi):
{
"pod": {
"spec": {
"containers": [
{
"name": "flow",
"resources": {
"requests": {
"cpu": "8",
"memory": "60Gi"
},
"limits": {
"cpu": "8",
"memory": "60Gi"
}
}
}
]
}
}
}
- Docker/ECS workers: use the fields in each worker’s job template (e.g., Docker memory/cpu settings or ECS task CPU/memory). See:
- Customize job variables
- ECS worker guide
Avoid loops and replays
- Add a tag like escalated
on the re-run via the automation action. In your automation trigger, filter out runs that already have escalated
so you only escalate once.
- Optionally add another Automation to notify if the “large” run also crashes.
- If you see “zombie” runs (lost heartbeats), you can tune heartbeat detection: Detect zombie flows
Putting it together in the UI
- Create/confirm your work pool(s) and job templates for small/large or confirm the job variables schema you’ll override.
- Create the “small” deployment (and optionally the “large” one if you use the two-deployment pattern).
- Add an Automation:
- Trigger: Flow run → State entered → Crashed
- Filters: deployment == small deployment
- Action:
- If using two deployments: Run deployment → select the large deployment, add tags ["escalated"]
, and forward parameters if needed.
- If using one deployment: Run the same deployment and provide the “large” job_variables
override and tags ["escalated"]
.
If you’d like, I can draft the exact Automation config and job variables for your setup. Which worker type are you using (Kubernetes, Docker, or ECS)? And do you prefer two deployments or a single deployment with job variable overrides?Joe D
08/23/2025, 6:20 PMJoe D
08/23/2025, 6:20 PMMarvin
08/23/2025, 6:20 PMMarvin
08/23/2025, 6:22 PM{
"__prefect_kind": "jinja",
"template": "{{ event.flow_run.parameters | tojson }}"
}
- Optionally also template the name/tags to mark the retry:
{
"flow_run_name": {
"__prefect_kind": "jinja",
"template": "{{ event.flow_run.name }} (escalated)"
},
"tags": [
"escalated"
]
}
Docs: Templating in Automations
B) From code (if you’re programmatically re-running)
from prefect.deployments import run_deployment
from prefect.runtime import flow_run
await run_deployment(
name="my-flow/large", # or same deployment
parameters=flow_run.parameters, # forwards originals
tags=[*flow_run.tags, "escalated"], # optional guard
)
Tip: Add a filter in your Automation trigger to exclude runs with the escalated
tag so you don’t loop retries.Joe D
08/23/2025, 6:33 PMMarvin
08/23/2025, 6:33 PMMarvin
08/23/2025, 6:33 PM{
"__prefect_kind": "jinja",
"template": "{{ event.flow_run.parameters | tojson }}"
}
To rename the re-run’s flow run name with a suffix:
Flow run name field:
{
"__prefect_kind": "jinja",
"template": "{{ event.flow_run.name }}_escalated"
}
If you also want to tag the re-run (recommended so you can filter and avoid loops):
Tags field:
{
"__prefect_kind": "jinja",
"template": "{{ (event.flow_run.tags + ['escalated']) | tojson }}"
}
Docs: Templating in AutomationsJoe D
08/23/2025, 6:36 PMJoe D
08/23/2025, 6:36 PMMarvin
08/23/2025, 6:36 PMMarvin
08/23/2025, 6:37 PM{
"__prefect_kind": "jinja",
"template": "{{ (event.flow_run.tags + ['escalated']) | tojson }}"
}
- (Optional) Flow run name field:
{
"__prefect_kind": "jinja",
"template": "{{ event.flow_run.name }}_escalated"
}
2) Exclude already-escalated runs (in the Trigger filters)
- Add a filter: Flow run tags does not contain escalated
- If you’re using a two-deployment pattern, also restrict to the “small” deployment so a crash on the large run won’t re-trigger.
Alternative guard (optional): add a filter “Flow run run count equals 0” so you only escalate the first crash.
Your Parameters template is already correct:
{
"__prefect_kind": "jinja",
"template": "{{ event.flow_run.parameters | tojson }}"
}
Docs: Templating in AutomationsJoe D
08/23/2025, 6:39 PMMarvin
08/23/2025, 6:39 PMMarvin
08/23/2025, 6:40 PM{
"__prefect_kind": "jinja",
"template": "{{ event.flow_run.parameters | tojson }}"
}
- Flow run name field (enable “Use template”):
{
"__prefect_kind": "jinja",
"template": "{{ event.flow_run.name }}_escalated"
}
- Tags field (enable “Use template”):
{
"__prefect_kind": "jinja",
"template": "{{ (event.flow_run.tags + ['escalated']) | tojson }}"
}
And to ensure you only get a single escalated re-run, add a Trigger filter:
- Flow run → Tags → does not contain → escalated
- (Optionally also filter Deployment == your “small” deployment.)
Templating docs: Templating in AutomationsJoe D
08/23/2025, 6:49 PMJoe D
08/23/2025, 6:49 PMMarvin
08/23/2025, 6:49 PMMarvin
08/23/2025, 6:53 PMprefect automation inspect "my-escalation" --yaml > automation.yaml
3) Apply/update via a tiny Python script (CI-friendly):
import asyncio
import yaml
from prefect.client.orchestration import get_client
async def apply_automation(path="automation.yaml"):
with open(path) as f:
spec = yaml.safe_load(f)
async with get_client() as client:
# simplest: try to create; if it already exists, delete/replace in UI or add your own upsert logic
await client.create_automation(spec)
if __name__ == "__main__":
asyncio.run(apply_automation())
Notes
- Use the UI export (step 2) to get the exact, valid schema for your workspace.
- For “single re-run” safety, include in the exported automation:
- Tags template that appends escalated
- Trigger filter “Flow run tags does not contain escalated”
- Optional filter to only match your “small” deployment
Helpful docs:
- CLI (inspect/export): prefect automation CLI
- Jinja templating in Automations: Templating in Automations
If you share your deployment names (small vs large) and worker type, I can draft an example automation in YAML you can import into your repo and apply with the script above.