<@ULVA73B9P> how to automatically cancel sub-flows...
# ask-marvin
e
@Marvin how to automatically cancel sub-flows when parent flow has timed-out ? I’m using prefect3 with k8s infrastructure.
m
thought for 486.2 seconds
Short answer: not automatically. In Prefect 3.x, subflows are independent flow runs. If the parent times out or is cancelled, Prefect won’t auto-cancel its children unless you add a hook/automation to do it. Two reliable patterns you can use (works with Kubernetes): 1) In-code hooks on the parent flow (simple and immediate) - Use flow state hooks to cancel any child flow runs whose
parent_flow_run_id
equals the parent’s run ID. - Attach the hook for both: - on_cancellation → always cancel children - on_failure → only cancel children if the failure is a FlowRunTimeoutError Example:
Copy code
from prefect import flow, get_client
from prefect.context import get_run_context
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterParentFlowRunId,
    FlowRunFilterStateType,
)
from prefect.states import get_state_exception
from prefect.flow_engine import FlowRunTimeoutError

async def _cancel_children():
    parent_id = get_run_context().flow_run.id
    async with get_client() as client:
        children = await client.read_flow_runs(
            flow_runs=FlowRunFilter(
                parent_flow_run_id=FlowRunFilterParentFlowRunId(any_=[parent_id]),
                state_type=FlowRunFilterStateType(any_=["RUNNING", "PENDING", "SCHEDULED", "CANCELLING"]),
            ),
            limit=200,
        )
        for r in children:
            try:
                await client.cancel_flow_run(r.id)
            except Exception as e:
                # best-effort; log and continue
                print(f"Failed to cancel child {r.id}: {e}")

async def cancel_children_on_cancel(flow, flow_run, state):
    await _cancel_children()

async def cancel_children_on_timeout(flow, flow_run, state):
    exc = get_state_exception(state)
    if isinstance(exc, FlowRunTimeoutError):
        await _cancel_children()

@flow(
    timeout_seconds=600,
    on_cancellation=[cancel_children_on_cancel],
    on_failure=[cancel_children_on_timeout],
)
async def parent_flow():
    # call subflows directly or via run_deployment(as_subflow=True)
    ...
Notes: - This works whether you call subflows directly (
child_flow(...)
) or launch them with `run_deployment(..., as_subflow=True)`; both create child flow runs with the parent’s ID. - On Kubernetes, each deployment-run subflow typically runs in its own pod. When you call
cancel_flow_run
, the Kubernetes worker will delete the corresponding job/pod. If your subflows run inline (direct
child_flow()
in the same process/pod), the cancellation will stop them in-process. 2) Automation-driven cleanup flow (centralized and UI-managed) - Create a small “cleanup” flow (same logic as
_cancel_children
above). - In the UI, add an Automation: - Trigger: parent flow run enters Cancelled OR Failed due to timeout - Action: RunDeployment for the cleanup flow, passing the parent flow run ID as a parameter - The cleanup flow then queries for children by
parent_flow_run_id
and cancels them. - This is great if you want the policy enforced across many parents without duplicating hook code. Helpful references: - Cancel workflows (Prefect 3 docs) - Automations overview blog Quick CLI tip (manual/batch ops): - You can cancel any single run with:
Copy code
prefect flow-run cancel <flow-run-id>
A couple of clarifying questions so I can tailor the best approach: - How are you launching subflows today: calling other flow functions directly inside the parent, or using
run_deployment(..., as_subflow=True)
? - Are your child flows each meant to run in their own Kubernetes pods (via a Kubernetes work pool), or do you prefer them to run in the same pod as the parent?