<@ULVA73B9P> is there a way to execute a hook when...
# ask-marvin
s
@Marvin is there a way to execute a hook when a deployed flow run gets cancelled ?
m
thought for 137.0 seconds
Yes. In Prefect 3.x you can attach a flow state hook that runs when the flow is cancelled. - Use the
on_cancellation
hook on your flow. Hooks can be async or sync and receive
(flow, flow_run, state)
so you have full context. Example:
Copy code
from prefect import flow, get_run_logger

async def on_cancel(flow, flow_run, state):
    logger = get_run_logger()
    logger.warning(f"Flow {flow_run.name} was cancelled (state={state.name})")

    # Example: call a webhook or clean up external resources
    import httpx
    async with httpx.AsyncClient(timeout=10) as client:
        await <http://client.post|client.post>(
            "<https://example.com/my-cancel-hook>",
            json={"flow_run_id": str(flow_run.id), "name": flow_run.name, "state": state.name},
        )

@flow(name="my-flow", on_cancellation=[on_cancel])
def my_flow():
    ...
Notes: - Cancellation generally moves through “Cancelling” to “Cancelled”; your hook runs when the flow enters the Cancelled state. - Hooks run in the same process as your flow. If the process is terminated abruptly (e.g., infrastructure kills the container), your hook may not run. If you need a guaranteed external action, use an Automation. Alternative: Automations for external actions - Create an Automation that triggers on flow runs entering the Cancelled state and performs a Call Webhook or Send Notification action. This is orchestrator-driven and does not depend on the flow process being alive. - You can manage automations via the UI or CLI (inspect/list/pause/resume). See: Automations overview blog Related docs: - Cancel workflows in Prefect 3 Would you like the hook to run cleanup code inside the flow runtime, or do you prefer an external, guaranteed webhook via an Automation?
s
@Marvin what is python syntax to cancel a flow run ?
m
thought for 231.1 seconds
In Prefect 3.x, cancel a flow run via the orchestration client’s
cancel_flow_run(flow_run_id)
. Async (recommended):
Copy code
from prefect import get_client

async def cancel_run(flow_run_id: str):
    async with get_client() as client:
        await client.cancel_flow_run(flow_run_id)
Cancel the current run from inside a flow:
Copy code
from prefect import flow, get_client
from prefect.context import get_run_context

@flow
async def my_flow():
    ctx = get_run_context()
    async with get_client() as client:
        await client.cancel_flow_run(ctx.flow_run.id)
Sync variant:
Copy code
from prefect.client.orchestration import SyncPrefectClient

def cancel_run_sync(flow_run_id: str):
    with SyncPrefectClient() as client:
        client.cancel_flow_run(flow_run_id)
Notes: - This issues a cancellation request (moves the run to “Cancelling” then “Cancelled”). - If the process is already finished, the request is rejected. Docs: Cancel workflows in Prefect 3
s
@Marvin The function
cancel_flow_run
does not exist on prefect client. Should I use the
set_flow_run_state
function instead ?
m
thought for 67.6 seconds
You’re right—there isn’t a
cancel_flow_run
method on the Prefect 3.x client. The supported way is to set the flow run’s state to “Cancelling” via
set_flow_run_state
. The orchestrator will then transition it to “Cancelled”. Async (recommended):
Copy code
from prefect import get_client
from prefect.client.schemas.actions import StateCreate
from prefect.client.schemas.objects import StateType

async def cancel_run(flow_run_id: str, reason: str = "Requested cancel"):
    async with get_client() as client:
        await client.set_flow_run_state(
            flow_run_id,
            StateCreate(type=StateType.CANCELLING, name="Cancelling", message=reason),
        )
Cancel the current run from inside a flow:
Copy code
from prefect import flow, get_client
from prefect.context import get_run_context
from prefect.client.schemas.actions import StateCreate
from prefect.client.schemas.objects import StateType

@flow
async def my_flow():
    ctx = get_run_context()
    async with get_client() as client:
        await client.set_flow_run_state(
            ctx.flow_run.id,
            StateCreate(type=StateType.CANCELLING, name="Cancelling", message="Manual cancel"),
        )
Sync variant:
Copy code
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.actions import StateCreate
from prefect.client.schemas.objects import StateType

def cancel_run_sync(flow_run_id: str):
    with SyncPrefectClient() as client:
        client.set_flow_run_state(
            flow_run_id,
            StateCreate(type=StateType.CANCELLING, name="Cancelling", message="Requested cancel"),
        )
Notes: - Prefer setting
CANCELLING
instead of `CANCELLED`; the engine will handle the transition to “Cancelled” safely. -
force=True
exists on
set_flow_run_state
but should be avoided unless you know you need to bypass orchestration. - CLI alternative:
prefect flow-run cancel <FLOW_RUN_ID>
does the same thing. Docs: Cancel workflows in Prefect 3