Mitch
06/07/2024, 11:20 PMMarvin
06/07/2024, 11:21 PMrun_deployment
and the parent flow reaches a terminal state. Here are some steps and options that might help in managing and cancelling subflows effectively:
1. Enhanced Cancellation: Prefect provides an enhanced cancellation mechanism, which you can enable for more robust cancellation handling. Set this configuration on your Prefect worker:
bash
prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
This can help with more reliably cancelling subflows when the parent flow is cancelled.
2. Monitoring Process: For flows and subflows started via run_deployment
, ensure they are associated with a deployment and monitored correctly. This is essential for cancellations to be enforced properly.
3. Cascading Cancellation for Deployments:
- Make sure to handle subflows as separate deployments if you want to manage subflow cancellations more predictably.
- Include an optional parameter like cancel_on_final_parent
in your run_deployment
calls in your flow definitions to ensure that subflows are cancelled when the parent flow enters a terminal state. This has been proposed by the Prefect community to simplify cascading cancellations.
4. Manual Cancellation:
- You can manually enforce subflow cancellations by polling the status of the parent flow and issuing cancellation requests for subflows that are still running.
5. Proposed Feature Request: There's a feature request for adding a "Cancel subflows" button or functionality within the Prefect UI, which could streamline cancelling all subflows at once.
You can refer to these links for more detailed information:
- Concept of Flows in Prefect
- Prefect Deployment
- GitHub Issues Discussing This Feature:
- Support subflow cancellation if parent flow is in terminal state
- Cascading cancellation for sub-deployments
Would you like specific code examples or additional clarification on any of the steps above?Nate
06/07/2024, 11:31 PMon_cancellation
hook
• get flow runs of interest
◦ get current flow run id
◦ client.read_flow_runs(flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[current_flow_run_id])))
• set those flow runs to Cancelled
◦ client.set_flow_run_state(Cancelled(), force=True)
for each of the resulting flow runsMitch
06/08/2024, 12:49 AMon_cancellation
in the decorator:
async def cancel_subflows(flow, flow_run, state):
async with get_client() as client:
context = FlowRunContext.get()
current_flow_run_id = context.flow_run.id
client.read_flow_runs(flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[current_flow_run_id])))
client.set_flow_run_state(Cancelled(), force=True)
but it exhibited the exact same behavior i.e. it doesn't work.
I found this related issue though: https://github.com/PrefectHQ/prefect/issues/12864Nate
06/08/2024, 1:07 AMfrom prefect import flow
from prefect.context import get_run_context
from prefect.exceptions import MissingContextError
from prefect.states import Cancelling
def hook(flow, flow_run, state):
errored = False
try:
get_run_context()
except MissingContextError:
print("because we're outside of a flow run")
errored = True
assert errored
breakpoint()
print(f"the flow run this hook is attached to is cancelling: {flow_run.id}")
@flow(on_cancellation=[hook])
def too_edgy():
return Cancelling()
if __name__ == "__main__":
try:
too_edgy()
except Exception as exc:
print(exc)
Nate
06/08/2024, 1:07 AMMitch
06/08/2024, 1:07 AMMitch
06/10/2024, 8:18 PMasync def cancel_subflows(flow, flow_run, state):
async with get_client() as client:
logger = get_logger()
<http://logger.info|logger.info>(f"Flow run context is {flow_run.id}")
runs = await client.read_flow_runs(flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[flow_run.id])))
<http://logger.info|logger.info>("Cancelling runs")
for run in runs:
<http://logger.info|logger.info>(f"Cancelling run {run.flow_id}")
await client.set_flow_run_state(flow_run_id=run.id, state=Cancelled(), force=True)
<http://logger.info|logger.info>("Completed cancelling runs")
And all the flows enter cancel status, but the pods continue to run so the cluster don't scale down. Tasks within the pods continue to run even though the UI shows them entire flow as cancelled...