<@ULVA73B9P> How can i cancel deployment runs trig...
# ask-marvin
j
@Marvin How can i cancel deployment runs triggered from one deployment if the original deployment is cancelled? For example, if i have 10 triggered runs from one run id like to be able to cancel them all at once. Also what is the difference between a subflow and flow? Are they handled differently or is it only that they will show up in the original run?
m
thought for 787.2 seconds
Great questions — here’s how to handle both in Prefect 3.x. First, a quick clarification: are your “triggered runs” created by calling
run_deployment(...)
from a parent flow, or are they created by directly calling another
@flow
function inside your flow (i.e., an inline subflow)? The approach differs slightly. I’ll cover both below. How to cancel all child runs when the original is cancelled - There isn’t an automatic “cascade cancel all children” for separate flow runs. You’ll want to add a cancellation hook to the parent flow that finds and cancels its children. - If you use
run_deployment(..., as_subflow=True)
(the default), those runs are recorded as subflows with
parent_flow_run_id
set to the parent. You can query by that relationship and cancel them. - If you use
run_deployment(..., as_subflow=False)
or create runs in another way, propagate a tag (e.g.,
root:<parent_id>
) to the children so you can find them and cancel by tag. Example: cancel all children on parent cancellation
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.runtime import flow_run
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterParentFlowRunId

# Hook called when the parent is cancelled
async def cancel_children_on_parent_cancel(flow, fr, state):
    parent_id = flow_run.id
    async with get_client() as client:
        children = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                parent_flow_run_id=FlowRunFilterParentFlowRunId(any_=[parent_id])
            )
        )
        for child in children:
            # Transitions the child into 'CANCELLING' then 'CANCELLED'
            await client.cancel_flow_run(child.id)

@flow(
    name="parent",
    on_cancellation=[cancel_children_on_parent_cancel],  # attach the hook
)
async def parent():
    # Launch children as subflows so they’re linked to the parent
    # as_subflow=True is the default, but shown here for clarity
    for i in range(10):
        await run_deployment(
            name="my-child-deployment",
            parameters={"i": i},
            as_subflow=True,
        )
If you need to cancel by tag instead (e.g., because you used
as_subflow=False
), add a tag when launching children:
Copy code
tag = f"root:{flow_run.id}"
await run_deployment(
    name="my-child-deployment",
    tags=[tag],
    as_subflow=False,
)
Then change the filter in the hook to
FlowRunFilterTags(any_=[tag])
. Notes: - The example above cancels immediate children. If you have deeper trees, reuse the same
on_cancellation
hook on the children so cancellation cascades down the tree. - The Prefect CLI can cancel a single run (
prefect flow-run cancel <id>
), but there isn’t a bulk-cancel command today; a short Python utility like the one above is the most reliable way to cancel many runs at once. - Docs: - Cancel workflows - State change hooks Subflow vs flow: what’s the difference? - “Flow” is any function decorated with
@flow
. - “Subflow” just means a flow that is invoked by another flow. There are two common ways: 1) Inline subflow: Call another
@flow
function directly in Python. It executes in the same process as the parent and is recorded as a nested subflow run in the UI. Cancellation of the parent naturally stops it because it’s running in the same process. 2) Deployment-triggered subflow: Use
run_deployment(...)
. This creates a separate flow run picked up by a worker. If you pass
as_subflow=True
(default), it is linked as a child in the UI, but it still runs in its own infrastructure. Parent cancellation does not automatically cancel it — use the hook above to do that. So, they’re primarily “handled differently” with respect to execution environment:
- Inline subflows share the parent’s process/infrastructure; cancellation/crash tends to propagate naturally. - Deployment-triggered subflows are separate flow runs; they appear as children in the UI if
as_subflow=True
, but you must explicitly cancel them. If you can share which pattern you’re using, I can tailor the snippet exactly for your setup (e.g., by tags, by parent relationship, or across multiple levels).
j
what is fr, state etc. in the function. Not called
@Marvin what is fr, state etc. in the function. Not called
m
thought for 494.3 seconds
Short answer: your hook isn’t being called because its signature doesn’t match what Prefect 3.x passes. In Prefect 3.x, hooks are called with exactly three positional arguments: - Flow hooks (on_completion, on_failure, on_running, on_cancellation, on_crashed): 1) flow 2) flow_run 3) state - Task hooks (on_completion, on_failure): 1) task 2) task_run 3) state Notes: - They’re passed positionally, not by keyword. Your parameter names can be anything (e.g.,
_, fr, state
) but you must accept three args. - Aliases like
fr
or
tr
aren’t special; Prefect does not recognize them as keywords. The arity (3 args) is what matters. - Prefect does not inject
result
,
parameters
, or
exception
directly. Use the
state
to derive what you need: -
prefect.states.get_state_result(state, raise_on_failure=False)
for the result -
prefect.states.get_state_exception(state)
for the exception (if failed) Authoritative references: - State change hooks (Prefect 3.x docs) - Flow decorator shows hooks accept FlowStateHook which is called as
(flow, flow_run, state)
Working examples Flow failure hook
Copy code
from prefect import flow, get_run_logger, states

def on_flow_failure(flow, flow_run, state):
    logger = get_run_logger()
    exc = states.get_state_exception(state)
    logger.error(f"Flow {flow_run.name} failed: {exc}")

@flow(on_failure=[on_flow_failure])  # hooks must be provided as a list
def my_flow():
    raise ValueError("boom")

if __name__ == "__main__":
    my_flow()
If you don’t need the first arg, you can ignore it:
Copy code
def on_flow_failure(_, flow_run, state):
    ...
Task completion hook
Copy code
from prefect import task, states

def on_task_completion(task, task_run, state):
    result = states.get_state_result(state, raise_on_failure=False)
    print(f"Task {task_run.name} completed with result={result}")

@task(on_completion=[on_task_completion])
def add(x, y):
    return x + y
Common pitfalls that prevent hooks from running - Defining the hook with only two parameters (e.g.,
def hook(fr, state): ...
) — Prefect calls hooks with three args, causing a TypeError. - Passing a single function instead of a list to the decorator (use
on_failure=[my_hook]
, not
on_failure=my_hook
). - Raising in the hook; exceptions are logged but won’t stop the run. If you share your hook function signature and where you attach it (flow/task decorator), I can spot-check why it wasn’t called.