Etienne Callies
09/18/2025, 1:16 PMMarvin
09/18/2025, 1:16 PMMarvin
09/18/2025, 1:24 PMparent_flow_run_id
equals the parent’s run ID.
- Attach the hook for both:
- on_cancellation → always cancel children
- on_failure → only cancel children if the failure is a FlowRunTimeoutError
Example:
from prefect import flow, get_client
from prefect.context import get_run_context
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterParentFlowRunId,
FlowRunFilterStateType,
)
from prefect.states import get_state_exception
from prefect.flow_engine import FlowRunTimeoutError
async def _cancel_children():
parent_id = get_run_context().flow_run.id
async with get_client() as client:
children = await client.read_flow_runs(
flow_runs=FlowRunFilter(
parent_flow_run_id=FlowRunFilterParentFlowRunId(any_=[parent_id]),
state_type=FlowRunFilterStateType(any_=["RUNNING", "PENDING", "SCHEDULED", "CANCELLING"]),
),
limit=200,
)
for r in children:
try:
await client.cancel_flow_run(r.id)
except Exception as e:
# best-effort; log and continue
print(f"Failed to cancel child {r.id}: {e}")
async def cancel_children_on_cancel(flow, flow_run, state):
await _cancel_children()
async def cancel_children_on_timeout(flow, flow_run, state):
exc = get_state_exception(state)
if isinstance(exc, FlowRunTimeoutError):
await _cancel_children()
@flow(
timeout_seconds=600,
on_cancellation=[cancel_children_on_cancel],
on_failure=[cancel_children_on_timeout],
)
async def parent_flow():
# call subflows directly or via run_deployment(as_subflow=True)
...
Notes:
- This works whether you call subflows directly (child_flow(...)
) or launch them with `run_deployment(..., as_subflow=True)`; both create child flow runs with the parent’s ID.
- On Kubernetes, each deployment-run subflow typically runs in its own pod. When you call cancel_flow_run
, the Kubernetes worker will delete the corresponding job/pod. If your subflows run inline (direct child_flow()
in the same process/pod), the cancellation will stop them in-process.
2) Automation-driven cleanup flow (centralized and UI-managed)
- Create a small “cleanup” flow (same logic as _cancel_children
above).
- In the UI, add an Automation:
- Trigger: parent flow run enters Cancelled OR Failed due to timeout
- Action: RunDeployment for the cleanup flow, passing the parent flow run ID as a parameter
- The cleanup flow then queries for children by parent_flow_run_id
and cancels them.
- This is great if you want the policy enforced across many parents without duplicating hook code.
Helpful references:
- Cancel workflows (Prefect 3 docs)
- Automations overview blog
Quick CLI tip (manual/batch ops):
- You can cancel any single run with:
prefect flow-run cancel <flow-run-id>
A couple of clarifying questions so I can tailor the best approach:
- How are you launching subflows today: calling other flow functions directly inside the parent, or using run_deployment(..., as_subflow=True)
?
- Are your child flows each meant to run in their own Kubernetes pods (via a Kubernetes work pool), or do you prefer them to run in the same pod as the parent?