<@ULVA73B9P> I'm trying to test the automation to ...
# ask-marvin
k
@Marvin I'm trying to test the automation to crash zombie flows locally. Do I need to deploy my test flow on the local server or can I just run it as a script using
Copy code
if __name__ == "__main__":
    simulate_zombie()
m
thought for 126.6 seconds
Short answer: you don’t need a deployment. You can run the flow as a script, as long as that run is registered with the same Prefect server where your “Crash Zombie Runs” automation lives. Key requirements: - Your script must be connected to your Prefect server/Cloud (i.e.,
PREFECT_API_URL
or an active profile pointing to your server). If you run in ephemeral/offline mode, automations won’t see the run. - The flow must start and reach Running so it’s sending heartbeats. If the process dies and heartbeats stop, the automation will mark it Crashed after your configured threshold. Quick way to simulate a zombie locally:
Copy code
from prefect import flow
import time, os

@flow
def simulate_zombie():
    # long-running work so you can kill it
    time.sleep(600)

if __name__ == "__main__":
    # ensure your environment points at your local server before running this
    simulate_zombie()
Steps: 1) Confirm your script is pointed at your local server (you should see the run appear in the UI and show as Running). 2) Start the flow. 3) Kill the Python process hard (e.g., close the terminal or
kill -9 <pid>
). 4) Wait for the automation’s heartbeat/miss threshold; the run should move to Crashed. Tip: For faster testing, shorten the heartbeat/grace thresholds in your automation. If you also want worker slots released, then run via a worker (deployment or
flow.from_source(...).deploy(...)
), but for simply crashing zombie runs, a script run is enough. Docs for background: Automations overview
k
@Marvin I can see my flow "running" in the UI for more than 20 minutes already. Here is the code:
Copy code
from prefect import flow
import time
import signal
import os

@flow(name="zombie-test-flow")
def simulate_zombie():
    print("Starting flow...")
    try:
        # Simulate work
        time.sleep(30)
        # Simulate a crash by sending SIGSTOP to ourselves
        os.kill(os.getpid(), signal.SIGSTOP)
    except Exception as e:
        print(f"Flow encountered an error: {e}")
        raise

if __name__ == "__main__":
    simulate_zombie()
Here is my automation json:
Copy code
{
  "type": "event",
  "match": {
    "prefect.resource.id": [
      "prefect.flow-run.*"
    ]
  },
  "match_related": {},
  "after": [
    "prefect.flow-run.heartbeat"
  ],
  "expect": [
    "prefect.flow-run.heartbeat",
    "prefect.flow-run.Cancelled",
    "prefect.flow-run.Crashed",
    "prefect.flow-run.Failed",
    "prefect.flow-run.Completed"
  ],
  "for_each": [
    "prefect.resource.id"
  ],
  "posture": "Proactive",
  "threshold": 1,
  "within": 10
}
As I can see in the UI the automation is on (green toggle) Why my flow is not crashed yet?
m
thought for 86.5 seconds
Likely cause: your automation is waiting for a heartbeat that never arrives. In Prefect 3.x, ad‑hoc script runs don’t always emit
prefect.flow-run.heartbeat
events. Your trigger says “after a heartbeat, expect another within 10s,” so if no heartbeat ever occurs, the automation never arms and never fires. What to check/try: - Check the run’s Events tab in the UI. Do you see any
prefect.flow-run.heartbeat
events? If not, that explains it. - Anchor on Running instead of heartbeat. This arms the automation as soon as the run starts. Example trigger that works for script runs:
Copy code
{
  "type": "event",
  "match": {
    "prefect.resource.id": [
      "prefect.flow-run.*"
    ]
  },
  "after": [
    "prefect.flow-run.Running"
  ],
  "expect": [
    "prefect.flow-run.heartbeat",
    "prefect.flow-run.Cancelled",
    "prefect.flow-run.Crashed",
    "prefect.flow-run.Failed",
    "prefect.flow-run.Completed"
  ],
  "for_each": [
    "prefect.resource.id"
  ],
  "posture": "Proactive",
  "threshold": 1,
  "within": 60
}
And make sure your automation has an action to actually crash the run, e.g. “Change state to Crashed.” Without an action, it will detect but won’t do anything. Faster testing tips: - Set
within
to something like 20–30s (60s is safer if heartbeats exist and are ~30s apart). - Your SIGSTOP approach is fine; it suspends the process and halts heartbeats if they exist. - If you want guaranteed heartbeats, run the flow via a deployment/worker; those runs emit heartbeats reliably. But it’s not strictly required if you anchor on Running. Docs for background: Automations overview If you share a screenshot of the run’s Events feed (or confirm whether any heartbeat events exist), I can tailor the trigger exactly.
k
@Marvin what are the ways to create an automation?
m
🔧 Using:
explore_module_offerings
📊 Tools used: •
check_cli_command
(24x) •
explore_module_offerings
(48x) •
research_prefect_topic
(12x) •
search_prefect_3x_docs
(12x)
thought for 202.1 seconds
m
Short version: two ways. - UI (recommended): Go to Automations in the Prefect UI, click “New Automation,” pick a template (e.g., Crash Zombie Runs) or build your own, then add an action (e.g., Change flow run state → Crashed) and save. - Python API: Create automations programmatically with the orchestration client and the events schemas/actions.
Copy code
import anyio
  from prefect.client.orchestration import get_client
  from prefect.events.schemas.automations import Automation, EventTrigger, Posture
  from prefect.events.actions import ChangeFlowRunState

  async def main():
      async with get_client() as client:
          automation = Automation(
              name="Crash zombie runs",
              enabled=True,
              trigger=EventTrigger(
                  match={"prefect.resource.id": ["prefect.flow-run.*"]},
                  after=["prefect.flow-run.Running"],
                  expect=[
                      "prefect.flow-run.heartbeat",
                      "prefect.flow-run.Cancelled",
                      "prefect.flow-run.Crashed",
                      "prefect.flow-run.Failed",
                      "prefect.flow-run.Completed",
                  ],
                  for_each=["prefect.resource.id"],
                  posture=Posture.PROACTIVE,
                  threshold=1,
                  within=30,
              ),
              actions=[ChangeFlowRunState(state="crashed")],
          )
          await client.create_automation(automation)

  anyio.run(main)
The shape mirrors the JSON you posted: same
match/after/expect/for_each/posture/threshold/within
, plus an action. What you can’t do: create automations via CLI. The CLI only manages existing ones:
Copy code
prefect automation ls
prefect automation inspect <id>
prefect automation enable|disable <id>
prefect automation delete <id>
Helpful docs: - Automations overview
k
@Marvin I'm on prefect==3.4.8. I'm trying to create an automation using a CLI command:
Copy code
prefect automation create -f automation.yaml
I get the following error:
Copy code
Failed to create 1 automation(s):
  - Cancel Long Running Flows: Client error '422 Unprocessable Entity' for url '<http://127.0.0.1:4200/api/automations/>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'type': 'extra_forbidden', 'loc': ['body', 'tags'], 'msg': 'Extra inputs are not permitted', 'input': []}], 'request_body': {'name':
'Cancel Long Running Flows', 'description': 'Cancels flows running longer than 1 minutes', 'enabled': True, 'tags': [], 'trigger': {'type': 'event', 'match': {}, 'match_related': {}, 'after': [], 'expect': [],
'for_each': [], 'posture': 'Reactive', 'threshold': 1, 'within': 0.0}, 'actions': [{'type': 'cancel-flow-run'}], 'actions_on_trigger': [], 'actions_on_resolve': [], 'owner_resource': None}}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>
Here is my automation.yaml:
Copy code
name: Cancel Long Running Flows
description: Cancels flows running longer than 1 minutes
enabled: true
trigger:
  type: event
  posture: Reactive
  match_state_name: RUNNING
  match_state_duration_s: 60
actions:
  - type: cancel-flow-run
What's wrong?
m
thought for 109.9 seconds
You’re on Prefect 3.4.8 — there is no CLI command to create automations. That command is from an older era. In 3.x, the CLI only manages existing automations (ls/inspect/enable/disable/delete). Your 422 came from the CLI trying to POST a schema that 3.x doesn’t accept (e.g., the stray
tags
field). What to do instead: - Use the UI: Automations → New Automation → pick a template or build your own → add an action → Save. - Or use the Python API. Example: “Cancel long running flows” after 60s This fires if a flow run stays Running longer than 60s and cancels it.
Copy code
import anyio
from prefect.client.orchestration import get_client
from prefect.events.schemas.automations import Automation, EventTrigger, Posture
from prefect.events.actions import CancelFlowRun

async def main():
    async with get_client() as client:
        auto = Automation(
            name="Cancel Long Running Flows",
            enabled=True,
            trigger=EventTrigger(
                match={"prefect.resource.id": ["prefect.flow-run.*"]},
                after=["prefect.flow-run.Running"],
                expect=[
                    "prefect.flow-run.Completed",
                    "prefect.flow-run.Failed",
                    "prefect.flow-run.Cancelled",
                ],
                for_each=["prefect.resource.id"],
                posture=Posture.PROACTIVE,
                threshold=1,
                within=60,  # seconds
            ),
            actions=[CancelFlowRun()],
        )
        await client.create_automation(auto)

anyio.run(main)
To verify via CLI (the things that do exist):
Copy code
prefect automation ls
prefect automation inspect <id>
prefect automation enable <id>
prefect automation disable <id>
prefect automation delete <id>
Docs: - Automations overview If you want to scope this to specific flows/tags, we can refine the `match`/`match_related` filter accordingly.
n
doh! i think marvin is mistaken here
the 422 looks like a client server mismatch or an incorrect inclusion of an extra field are you on OSS server or cloud?
k
Thanks for helping! I'm on OSS
👍 1
n
yea marvin is not right about 3.4.8, it has the automations CLI what version is your prefect server running?
k
Oh, it's my mistake, I forgot to restart the server after updating
n
oh ok - all good now?
k
Yes, the automation is created 👍
n
catjam
party deploy 1