Jean Morris
09/15/2025, 2:19 PMMarvin
09/15/2025, 2:19 PMMarvin
09/15/2025, 2:32 PMrun_deployment(...)
from a parent flow, or are they created by directly calling another @flow
function inside your flow (i.e., an inline subflow)? The approach differs slightly. I’ll cover both below.
How to cancel all child runs when the original is cancelled
- There isn’t an automatic “cascade cancel all children” for separate flow runs. You’ll want to add a cancellation hook to the parent flow that finds and cancels its children.
- If you use run_deployment(..., as_subflow=True)
(the default), those runs are recorded as subflows with parent_flow_run_id
set to the parent. You can query by that relationship and cancel them.
- If you use run_deployment(..., as_subflow=False)
or create runs in another way, propagate a tag (e.g., root:<parent_id>
) to the children so you can find them and cancel by tag.
Example: cancel all children on parent cancellation
from prefect import flow
from prefect.deployments import run_deployment
from prefect.runtime import flow_run
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterParentFlowRunId
# Hook called when the parent is cancelled
async def cancel_children_on_parent_cancel(flow, fr, state):
parent_id = flow_run.id
async with get_client() as client:
children = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
parent_flow_run_id=FlowRunFilterParentFlowRunId(any_=[parent_id])
)
)
for child in children:
# Transitions the child into 'CANCELLING' then 'CANCELLED'
await client.cancel_flow_run(child.id)
@flow(
name="parent",
on_cancellation=[cancel_children_on_parent_cancel], # attach the hook
)
async def parent():
# Launch children as subflows so they’re linked to the parent
# as_subflow=True is the default, but shown here for clarity
for i in range(10):
await run_deployment(
name="my-child-deployment",
parameters={"i": i},
as_subflow=True,
)
If you need to cancel by tag instead (e.g., because you used as_subflow=False
), add a tag when launching children:
tag = f"root:{flow_run.id}"
await run_deployment(
name="my-child-deployment",
tags=[tag],
as_subflow=False,
)
Then change the filter in the hook to FlowRunFilterTags(any_=[tag])
.
Notes:
- The example above cancels immediate children. If you have deeper trees, reuse the same on_cancellation
hook on the children so cancellation cascades down the tree.
- The Prefect CLI can cancel a single run (prefect flow-run cancel <id>
), but there isn’t a bulk-cancel command today; a short Python utility like the one above is the most reliable way to cancel many runs at once.
- Docs:
- Cancel workflows
- State change hooks
Subflow vs flow: what’s the difference?
- “Flow” is any function decorated with @flow
.
- “Subflow” just means a flow that is invoked by another flow. There are two common ways:
1) Inline subflow: Call another @flow
function directly in Python. It executes in the same process as the parent and is recorded as a nested subflow run in the UI. Cancellation of the parent naturally stops it because it’s running in the same process.
2) Deployment-triggered subflow: Use run_deployment(...)
. This creates a separate flow run picked up by a worker. If you pass as_subflow=True
(default), it is linked as a child in the UI, but it still runs in its own infrastructure. Parent cancellation does not automatically cancel it — use the hook above to do that.
So, they’re primarily “handled differently” with respect to execution environment:Marvin
09/15/2025, 2:32 PMas_subflow=True
, but you must explicitly cancel them.
If you can share which pattern you’re using, I can tailor the snippet exactly for your setup (e.g., by tags, by parent relationship, or across multiple levels).Jean Morris
09/15/2025, 4:11 PMJean Morris
09/16/2025, 8:41 AMMarvin
09/16/2025, 8:41 AMMarvin
09/16/2025, 8:49 AM_, fr, state
) but you must accept three args.
- Aliases like fr
or tr
aren’t special; Prefect does not recognize them as keywords. The arity (3 args) is what matters.
- Prefect does not inject result
, parameters
, or exception
directly. Use the state
to derive what you need:
- prefect.states.get_state_result(state, raise_on_failure=False)
for the result
- prefect.states.get_state_exception(state)
for the exception (if failed)
Authoritative references:
- State change hooks (Prefect 3.x docs)
- Flow decorator shows hooks accept FlowStateHook which is called as (flow, flow_run, state)
Working examples
Flow failure hook
from prefect import flow, get_run_logger, states
def on_flow_failure(flow, flow_run, state):
logger = get_run_logger()
exc = states.get_state_exception(state)
logger.error(f"Flow {flow_run.name} failed: {exc}")
@flow(on_failure=[on_flow_failure]) # hooks must be provided as a list
def my_flow():
raise ValueError("boom")
if __name__ == "__main__":
my_flow()
If you don’t need the first arg, you can ignore it:
def on_flow_failure(_, flow_run, state):
...
Task completion hook
from prefect import task, states
def on_task_completion(task, task_run, state):
result = states.get_state_result(state, raise_on_failure=False)
print(f"Task {task_run.name} completed with result={result}")
@task(on_completion=[on_task_completion])
def add(x, y):
return x + y
Common pitfalls that prevent hooks from running
- Defining the hook with only two parameters (e.g., def hook(fr, state): ...
) — Prefect calls hooks with three args, causing a TypeError.
- Passing a single function instead of a list to the decorator (use on_failure=[my_hook]
, not on_failure=my_hook
).
- Raising in the hook; exceptions are logged but won’t stop the run.
If you share your hook function signature and where you attach it (flow/task decorator), I can spot-check why it wasn’t called.