Mitch
06/10/2024, 8:26 PMMitch
06/10/2024, 10:07 PMon_cancellation
hook that should probably be added to an examples or documentation somewhere for cancelling all subflows that run in parallel when the cancel button in the UI is pressed.
async def cancel_subflows(flow, flow_run, state):
async with get_client() as client:
logger = get_logger()
logger.info(f"Flow run context is {flow_run.id}")
runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[flow_run.id]))
)
logger.info("Cancelling runs")
for run in runs:
logger.info(f"Cancelling run {run.flow_id}")
await client.set_flow_run_state(
flow_run_id=run.id, state=Cancelling(), force=False
)
logger.info("Completed cancelling runs")
When a parent flow is cancelled, this will move all children (subflows) to a cancelling state while retaining the terminal status of flows that have already completed running. Jobs that are pending or about to be ran on infra will still start but immediately cancel (as should be expected).
Do not set force=True
and have state=Cancelled()
as it will cause discrepancies between the dashboard UI and infrastructure... I hope this saves someone else lots of time and compute costs.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by