<@ULVA73B9P> When I cancel a flows that creates su...
# ask-marvin
m
@Marvin When I cancel a flows that creates subflows using run_deployment, The flows in Late, Scheduled, Pending and Running statuses continue to still run. How do I get all of my subflows to cancel. It is very expensive to have 100's of GPU nodes continually running jobs that should be cancelled or having to cancel them all manually
m
It appears that cancelling subflows can indeed be challenging, especially when they are started with
run_deployment
and the parent flow reaches a terminal state. Here are some steps and options that might help in managing and cancelling subflows effectively: 1. Enhanced Cancellation: Prefect provides an enhanced cancellation mechanism, which you can enable for more robust cancellation handling. Set this configuration on your Prefect worker:
Copy code
bash
    prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
This can help with more reliably cancelling subflows when the parent flow is cancelled. 2. Monitoring Process: For flows and subflows started via
run_deployment
, ensure they are associated with a deployment and monitored correctly. This is essential for cancellations to be enforced properly. 3. Cascading Cancellation for Deployments: - Make sure to handle subflows as separate deployments if you want to manage subflow cancellations more predictably. - Include an optional parameter like
cancel_on_final_parent
in your
run_deployment
calls in your flow definitions to ensure that subflows are cancelled when the parent flow enters a terminal state. This has been proposed by the Prefect community to simplify cascading cancellations. 4. Manual Cancellation: - You can manually enforce subflow cancellations by polling the status of the parent flow and issuing cancellation requests for subflows that are still running. 5. Proposed Feature Request: There's a feature request for adding a "Cancel subflows" button or functionality within the Prefect UI, which could streamline cancelling all subflows at once. You can refer to these links for more detailed information: - Concept of Flows in Prefect - Prefect Deployment - GitHub Issues Discussing This Feature: - Support subflow cancellation if parent flow is in terminal state - Cascading cancellation for sub-deployments Would you like specific code examples or additional clarification on any of the steps above?
n
hi @Mitch - i may have mentioned this earlier, but you should be able to do the following in an
on_cancellation
hook • get flow runs of interest ◦ get current flow run id ◦
client.read_flow_runs(flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[current_flow_run_id])))
• set those flow runs to Cancelled ◦
client.set_flow_run_state(Cancelled(), force=True)
for each of the resulting flow runs
m
@Nate I added this to the parent flow with
on_cancellation
in the decorator:
Copy code
async def cancel_subflows(flow, flow_run, state):
    async with get_client() as client:
        context = FlowRunContext.get()
        current_flow_run_id = context.flow_run.id
        client.read_flow_runs(flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[current_flow_run_id])))
        client.set_flow_run_state(Cancelled(), force=True)
but it exhibited the exact same behavior i.e. it doesn't work. I found this related issue though: https://github.com/PrefectHQ/prefect/issues/12864
n
Copy code
from prefect import flow
from prefect.context import get_run_context
from prefect.exceptions import MissingContextError
from prefect.states import Cancelling


def hook(flow, flow_run, state):
    errored = False
    try:
        get_run_context()
    except MissingContextError:
        print("because we're outside of a flow run")
        errored = True

    assert errored
    breakpoint()
    print(f"the flow run this hook is attached to is cancelling: {flow_run.id}")


@flow(on_cancellation=[hook])
def too_edgy():
    return Cancelling()


if __name__ == "__main__":
    try:
        too_edgy()
    except Exception as exc:
        print(exc)
in a hook, you're outside of a flow run context
m
Ok I will fix
@Nate I implemented this
Copy code
async def cancel_subflows(flow, flow_run, state):
    async with get_client() as client:
        logger = get_logger()

        <http://logger.info|logger.info>(f"Flow run context is {flow_run.id}")
        runs = await client.read_flow_runs(flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[flow_run.id])))
        <http://logger.info|logger.info>("Cancelling runs")
        for run in runs:
            <http://logger.info|logger.info>(f"Cancelling run {run.flow_id}")
            await client.set_flow_run_state(flow_run_id=run.id, state=Cancelled(), force=True)
        <http://logger.info|logger.info>("Completed cancelling runs")
And all the flows enter cancel status, but the pods continue to run so the cluster don't scale down. Tasks within the pods continue to run even though the UI shows them entire flow as cancelled...