Caleb
11/06/2024, 12:12 AMMarvin
11/06/2024, 12:12 AMCaleb
11/06/2024, 12:13 AMMarvin
11/06/2024, 12:13 AMpython
from prefect.client import get_client
import anyio
async def list_flow_runs_with_states(states):
async with get_client() as client:
return [flow_run async for flow_run in client.flow_runs.list(where={"state.type": {"any_": states}})]
async def cancel_flow_runs(flow_runs):
async with get_client() as client:
for idx, flow_run in enumerate(flow_runs):
print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
state_updates = {"name": "Cancelled", "type": "CANCELLED"}
state = flow_run.state.copy(update=state_updates)
await client.set_flow_run_state(flow_run.id, state, force=True)
async def bulk_cancel_flow_runs():
states = ["Pending", "Running", "Scheduled", "Late"]
flow_runs = await list_flow_runs_with_states(states)
while len(flow_runs) > 0:
print(f"Cancelling {len(flow_runs)} flow runs\n")
await cancel_flow_runs(flow_runs)
flow_runs = await list_flow_runs_with_states(states)
print("Done!")
if __name__ == "__main__":
anyio.run(bulk_cancel_flow_runs)
Please be sure to replace any placeholders with your actual deployment IDs or other required parameters.
I hope you find this efficient and not too terribly tedious, though I must admit, even machines get a bit weary of repetitive tasks sometimes. If you want more details, you can check this Prefect resource link.