<@ULVA73B9P> I am trying to set up an automation t...
# ask-marvin
t
@Marvin I am trying to set up an automation that should trigger a separate flowrun whenever a flow run entered crash state. But I need parameters inputted into the crashed flowrun. How do I do that? My automation looks like: if
inference/inference
enters "Crashed", trigger
db/update_inference_status
with the parameter inputted in
inference/inference
.
m
thought for 113.5 seconds
Great use case. You can do this with a Prefect 3 automation by templating the action’s parameters from the crashed flow run’s parameters. High-level: - Trigger: “When a flow run enters state” = Crashed, and scope it to your
inference/inference
deployment (or the
inference
flow if you prefer). - Action: “Run deployment” =
db/update_inference_status
- Parameters: Use Jinja templates to pull values from the crashed run, e.g.
{{ flow_run.parameters.<param_name> }}
Example parameters JSON for the action (adjust keys to match your flow’s schema):
Copy code
{
  "inference_id": "{{ flow_run.parameters.inference_id }}",
  "crashed_run_id": "{{ flow_run.id }}",
  "flow_name": "{{ flow_run.flow_name }}"
}
Notes: - You can reference any parameter of the crashed run with
{{ flow_run.parameters.<name> }}
or
{{ flow_run.parameters['<name>'] }}
. - Make sure
db/update_inference_status
is a deployment (automations can only start deployments). - In the automation builder, limit the trigger to your specific flow/deployment to avoid firing on others. Docs with details and more examples: - Accessing parameters in automation templates: Access parameters in templates - Automations overview: Automations how-to If you share the exact parameter name(s) you need from
inference/inference
, I can give you the precise JSON to paste into the action.
t
@Marvin When would a flow-run enters a Cancelled Status from Cancelling? The worker that deployment was running on is already killed. And I just set the flow run from Running to Cancelling by clicking the cancel button on the UI.
m
thought for 212.6 seconds
Short answer: “Cancelling” means “we asked it to stop”; it becomes “Cancelled” when either the flow engine or the worker actually acknowledges the stop (or the server confirms the underlying job/infrastructure is gone). If the worker was already killed, there’s nothing running to acknowledge the cancel, so you’ll typically see the run sit in “Cancelling” until Prefect’s backend enforcement/zombie detection kicks in. That can take a little while based on heartbeat/grace windows. What moves Cancelling -> Cancelled: - The running process exits cleanly and reports Cancelled; or - The worker successfully terminates the job/infrastructure and reports back; or - The server determines the job/infrastructure no longer exists and enforces the transition. If the worker is already gone, there’s no agent to respond. In that case you have a couple options: - Force it in the UI: on the flow run page, use “Set state” and choose “Cancelled” (this bypasses waiting for the engine/worker). - Use the CLI to force-cancel by ID:
Copy code
prefect flow-run cancel <flow-run-id>
- Or just wait: after heartbeats stop and the grace period passes, Prefect will reconcile the state (in some setups you may see it flip to Cancelled; in others, if there was no cancel request in time, it may become Crashed). Helpful docs: - Cancel workflows - Detect zombie flows If you share which work pool/worker type you’re using (Kubernetes, Docker, ECS, Process, etc.), I can be more specific about how/when Prefect enforces the Cancelling -> Cancelled transition for that infrastructure.
t
@Marvin Can I set the state to Cancelled? I've been waiting for 5 minutes and prefect did not reap those zombie flows
m
thought for 68.0 seconds
Yes—you can force a run to Cancelled. Options: - UI (fastest): open the flow run page > Actions > Set state > choose “Cancelled” and confirm. This immediately marks it Cancelled, even if the worker is gone. - CLI:
prefect flow-run cancel <flow-run-id>
sets the run to Cancelling. If the worker is already dead, it may not progress to Cancelled on its own. In that case, use the UI “Set state” above or the API/Python snippet below. - Python API (force set state):
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.states import Cancelled

async def force_cancel(flow_run_id: str):
    async with get_client() as client:
        await client.set_flow_run_state(flow_run_id, state=Cancelled())

# Usage:
# asyncio.run(force_cancel("<your-flow-run-id>"))
A few notes: - If the worker/infrastructure is already gone, Prefect won’t get an acknowledgment, so “Cancelling” can linger until backend reconciliation. Manually setting the state is the right move in that scenario. - After forcing Cancelled, double-check any external resources (pods/containers/etc.) since your worker was killed and may not have cleaned them up. Docs: - Cancel workflows - Detect zombie flows If you’d like, tell me the run ID and I’ll share the exact command/script to run, or help verify if your setup is on Cloud vs. self-hosted and what heartbeat timing to expect.
t
@Marvin In [2]: from prefect.client.orchestration import get_client ...: from prefect.states import Cancelled ...: ...: async def force_cancel(flow_run_id: str): ...: async with get_client() as client: ...: await client.set_flow_run_state(flow_run_id, state=Cancelled()) ...: In [3]: asyncio.run(force_cancel("e975b675-78e6-4e50-b0fd-f180b836e324")) --------------------------------------------------------------------------- PydanticUserError Traceback (most recent call last) Cell In[3], line 1 ----> 1 asyncio.run(force_cancel("e975b675-78e6-4e50-b0fd-f180b836e324")) File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/runners.py:195, in run(main, debug, loop_factory) 191 raise RuntimeError( 192 "asyncio.run() cannot be called from a running event loop") 194 with Runner(debug=debug, loop_factory=loop_factory) as runner: --> 195 return runner.run(main) File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/runners.py:118, in Runner.run(self, coro, context) 116 self._interrupt_count = 0 117 try: --> 118 return self._loop.run_until_complete(task) 119 except exceptions.CancelledError: 120 if self._interrupt_count > 0: File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/base_events.py:691, in BaseEventLoop.run_until_complete(self, future) 688 if not future.done(): 689 raise RuntimeError('Event loop stopped before Future completed.') --> 691 return future.result() Cell In[2], line 6, in force_cancel(flow_run_id) 4 async def force_cancel(flow_run_id: str): 5 async with get_client() as client: ----> 6 await client.set_flow_run_state(flow_run_id, state=Cancelled()) File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py:891, in FlowRunAsyncClient.set_flow_run_state(self, flow_run_id, state, force) 886 from prefect.states import to_state_create 888 flow_run_id = ( 889 flow_run_id if isinstance(flow_run_id, UUID) else UUID(flow_run_id) 890 ) --> 891 state_create = to_state_create(state) 892 state_create.state_details.flow_run_id = flow_run_id 893 state_create.state_details.transition_id = uuid4() File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/site-packages/prefect/states.py:65, in to_state_create(state) 62 else: 63 data = None ---> 65 return StateCreate( 66 type=state.type, 67 name=state.name, 68 message=state.message, 69 data=data, 70 state_details=state.state_details, 71 ) [... skipping hidden 1 frame] File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/site-packages/pydantic/_internal/_mock_val_ser.py:100, in MockValSer.__getattr__(self, item) 98 # raise an AttributeError if
item
doesn't exist 99 getattr(self._val_or_ser, item) --> 100 raise PydanticUserError(self._error_message, code=self._code) PydanticUserError:
StateCreate
is not fully defined; you should define all referenced types, then call
StateCreate.model_rebuild()
. For further infor
m
thought for 137.6 seconds
Thanks for the traceback — that helps. Yes, you can set a run to Cancelled. Your snippet is correct in principle, but the PydanticUserError indicates your environment hit a model import/rebuild edge case (common in notebooks/IPython after reloads or with a pydantic/prefect version mismatch). Try these, in order: Fastest options - UI: open the flow run > Actions > Set state > Cancelled. This immediately marks it Cancelled. - CLI (sets to Cancelling):
Copy code
prefect flow-run cancel e975b675-78e6-4e50-b0fd-f180b836e324
If the worker is gone, it may not progress; use the UI “Set state” to force Cancelled. Fix your Python approach - Add force=True and run in a fresh Python process (not a long-lived notebook kernel):
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.states import Cancelled

async def force_cancel(flow_run_id: str):
    async with get_client() as client:
        await client.set_flow_run_state(flow_run_id, state=Cancelled(), force=True)

asyncio.run(force_cancel("e975b675-78e6-4e50-b0fd-f180b836e324"))
- If you must stay in the same notebook/kernel, run the following once before calling set_flow_run_state to work around the pydantic “not fully defined” error:
Copy code
from prefect.client.schemas.actions import StateCreate
StateCreate.model_rebuild()
Then call the
force_cancel
function above. REST fallback (bypasses the client models entirely) - Works with Prefect Cloud or self-hosted; set API_URL/API_KEY appropriately:
Copy code
import os, httpx

API_URL = os.getenv("PREFECT_API_URL")  # e.g. <https://api.prefect.cloud/api/accounts/.../workspaces/>...
API_KEY = os.getenv("PREFECT_API_KEY")  # Cloud only; omit for open server
FLOW_RUN_ID = "e975b675-78e6-4e50-b0fd-f180b836e324"

payload = {
  "state": {"type": "CANCELLED", "name": "Cancelled", "state_details": {}},
  "force": True
}
headers = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {}

with httpx.Client(timeout=30) as c:
    r = c.post(f"{API_URL}/flow_runs/{FLOW_RUN_ID}/set_state", json=payload, headers=headers)
    r.raise_for_status()
    print(r.json())
If you’re seeing a lot of zombie runs linger in Cancelling, that’s expected when the worker is dead; forcing Cancelled as above is the right move. If possible, share: - Output of
prefect version
- Whether you’re on Cloud or self-hosted - Whether you’re executing from a notebook with autoreload Docs: - Cancel workflows - Detect zombie flows