Kate
08/12/2025, 1:46 PMif __name__ == "__main__":
simulate_zombie()
Marvin
08/12/2025, 1:46 PMMarvin
08/12/2025, 1:48 PMPREFECT_API_URL
or an active profile pointing to your server). If you run in ephemeral/offline mode, automations won’t see the run.
- The flow must start and reach Running so it’s sending heartbeats. If the process dies and heartbeats stop, the automation will mark it Crashed after your configured threshold.
Quick way to simulate a zombie locally:
from prefect import flow
import time, os
@flow
def simulate_zombie():
# long-running work so you can kill it
time.sleep(600)
if __name__ == "__main__":
# ensure your environment points at your local server before running this
simulate_zombie()
Steps:
1) Confirm your script is pointed at your local server (you should see the run appear in the UI and show as Running).
2) Start the flow.
3) Kill the Python process hard (e.g., close the terminal or kill -9 <pid>
).
4) Wait for the automation’s heartbeat/miss threshold; the run should move to Crashed.
Tip: For faster testing, shorten the heartbeat/grace thresholds in your automation. If you also want worker slots released, then run via a worker (deployment or flow.from_source(...).deploy(...)
), but for simply crashing zombie runs, a script run is enough.
Docs for background: Automations overviewKate
08/12/2025, 2:01 PMfrom prefect import flow
import time
import signal
import os
@flow(name="zombie-test-flow")
def simulate_zombie():
print("Starting flow...")
try:
# Simulate work
time.sleep(30)
# Simulate a crash by sending SIGSTOP to ourselves
os.kill(os.getpid(), signal.SIGSTOP)
except Exception as e:
print(f"Flow encountered an error: {e}")
raise
if __name__ == "__main__":
simulate_zombie()
Here is my automation json:
{
"type": "event",
"match": {
"prefect.resource.id": [
"prefect.flow-run.*"
]
},
"match_related": {},
"after": [
"prefect.flow-run.heartbeat"
],
"expect": [
"prefect.flow-run.heartbeat",
"prefect.flow-run.Cancelled",
"prefect.flow-run.Crashed",
"prefect.flow-run.Failed",
"prefect.flow-run.Completed"
],
"for_each": [
"prefect.resource.id"
],
"posture": "Proactive",
"threshold": 1,
"within": 10
}
As I can see in the UI the automation is on (green toggle)
Why my flow is not crashed yet?Marvin
08/12/2025, 2:01 PMMarvin
08/12/2025, 2:02 PMprefect.flow-run.heartbeat
events. Your trigger says “after a heartbeat, expect another within 10s,” so if no heartbeat ever occurs, the automation never arms and never fires.
What to check/try:
- Check the run’s Events tab in the UI. Do you see any prefect.flow-run.heartbeat
events? If not, that explains it.
- Anchor on Running instead of heartbeat. This arms the automation as soon as the run starts.
Example trigger that works for script runs:
{
"type": "event",
"match": {
"prefect.resource.id": [
"prefect.flow-run.*"
]
},
"after": [
"prefect.flow-run.Running"
],
"expect": [
"prefect.flow-run.heartbeat",
"prefect.flow-run.Cancelled",
"prefect.flow-run.Crashed",
"prefect.flow-run.Failed",
"prefect.flow-run.Completed"
],
"for_each": [
"prefect.resource.id"
],
"posture": "Proactive",
"threshold": 1,
"within": 60
}
And make sure your automation has an action to actually crash the run, e.g. “Change state to Crashed.” Without an action, it will detect but won’t do anything.
Faster testing tips:
- Set within
to something like 20–30s (60s is safer if heartbeats exist and are ~30s apart).
- Your SIGSTOP approach is fine; it suspends the process and halts heartbeats if they exist.
- If you want guaranteed heartbeats, run the flow via a deployment/worker; those runs emit heartbeats reliably. But it’s not strictly required if you anchor on Running.
Docs for background: Automations overview
If you share a screenshot of the run’s Events feed (or confirm whether any heartbeat events exist), I can tailor the trigger exactly.Kate
08/12/2025, 2:24 PMMarvin
08/12/2025, 2:24 PMexplore_module_offerings
📊 Tools used:
• check_cli_command
(24x)
• explore_module_offerings
(48x)
• research_prefect_topic
(12x)
• search_prefect_3x_docs
(12x)Marvin
08/12/2025, 2:25 PMNate
08/12/2025, 2:27 PMMarvin
08/12/2025, 2:29 PMimport anyio
from prefect.client.orchestration import get_client
from prefect.events.schemas.automations import Automation, EventTrigger, Posture
from prefect.events.actions import ChangeFlowRunState
async def main():
async with get_client() as client:
automation = Automation(
name="Crash zombie runs",
enabled=True,
trigger=EventTrigger(
match={"prefect.resource.id": ["prefect.flow-run.*"]},
after=["prefect.flow-run.Running"],
expect=[
"prefect.flow-run.heartbeat",
"prefect.flow-run.Cancelled",
"prefect.flow-run.Crashed",
"prefect.flow-run.Failed",
"prefect.flow-run.Completed",
],
for_each=["prefect.resource.id"],
posture=Posture.PROACTIVE,
threshold=1,
within=30,
),
actions=[ChangeFlowRunState(state="crashed")],
)
await client.create_automation(automation)
anyio.run(main)
The shape mirrors the JSON you posted: same match/after/expect/for_each/posture/threshold/within
, plus an action.
What you can’t do: create automations via CLI. The CLI only manages existing ones:
prefect automation ls
prefect automation inspect <id>
prefect automation enable|disable <id>
prefect automation delete <id>
Helpful docs:
- Automations overviewKate
08/12/2025, 2:45 PMprefect automation create -f automation.yaml
I get the following error:
Failed to create 1 automation(s):
- Cancel Long Running Flows: Client error '422 Unprocessable Entity' for url '<http://127.0.0.1:4200/api/automations/>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'type': 'extra_forbidden', 'loc': ['body', 'tags'], 'msg': 'Extra inputs are not permitted', 'input': []}], 'request_body': {'name':
'Cancel Long Running Flows', 'description': 'Cancels flows running longer than 1 minutes', 'enabled': True, 'tags': [], 'trigger': {'type': 'event', 'match': {}, 'match_related': {}, 'after': [], 'expect': [],
'for_each': [], 'posture': 'Reactive', 'threshold': 1, 'within': 0.0}, 'actions': [{'type': 'cancel-flow-run'}], 'actions_on_trigger': [], 'actions_on_resolve': [], 'owner_resource': None}}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>
Here is my automation.yaml:
name: Cancel Long Running Flows
description: Cancels flows running longer than 1 minutes
enabled: true
trigger:
type: event
posture: Reactive
match_state_name: RUNNING
match_state_duration_s: 60
actions:
- type: cancel-flow-run
What's wrong?Marvin
08/12/2025, 2:46 PMMarvin
08/12/2025, 2:47 PMtags
field).
What to do instead:
- Use the UI: Automations → New Automation → pick a template or build your own → add an action → Save.
- Or use the Python API.
Example: “Cancel long running flows” after 60s
This fires if a flow run stays Running longer than 60s and cancels it.
import anyio
from prefect.client.orchestration import get_client
from prefect.events.schemas.automations import Automation, EventTrigger, Posture
from prefect.events.actions import CancelFlowRun
async def main():
async with get_client() as client:
auto = Automation(
name="Cancel Long Running Flows",
enabled=True,
trigger=EventTrigger(
match={"prefect.resource.id": ["prefect.flow-run.*"]},
after=["prefect.flow-run.Running"],
expect=[
"prefect.flow-run.Completed",
"prefect.flow-run.Failed",
"prefect.flow-run.Cancelled",
],
for_each=["prefect.resource.id"],
posture=Posture.PROACTIVE,
threshold=1,
within=60, # seconds
),
actions=[CancelFlowRun()],
)
await client.create_automation(auto)
anyio.run(main)
To verify via CLI (the things that do exist):
prefect automation ls
prefect automation inspect <id>
prefect automation enable <id>
prefect automation disable <id>
prefect automation delete <id>
Docs:
- Automations overview
If you want to scope this to specific flows/tags, we can refine the `match`/`match_related` filter accordingly.Nate
08/12/2025, 2:48 PMNate
08/12/2025, 2:48 PMKate
08/12/2025, 2:49 PMNate
08/12/2025, 2:51 PMKate
08/12/2025, 2:52 PMNate
08/12/2025, 2:59 PMKate
08/12/2025, 3:00 PMNate
08/12/2025, 3:00 PM