Caleb
11/06/2024, 12:12 AMMarvin
11/06/2024, 12:12 AMCaleb
11/06/2024, 12:13 AMMarvin
11/06/2024, 12:13 AMpython
from prefect.client import get_client
import anyio
async def list_flow_runs_with_states(states):
async with get_client() as client:
return [flow_run async for flow_run in client.flow_runs.list(where={"state.type": {"any_": states}})]
async def cancel_flow_runs(flow_runs):
async with get_client() as client:
for idx, flow_run in enumerate(flow_runs):
print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
state_updates = {"name": "Cancelled", "type": "CANCELLED"}
state = flow_run.state.copy(update=state_updates)
await client.set_flow_run_state(flow_run.id, state, force=True)
async def bulk_cancel_flow_runs():
states = ["Pending", "Running", "Scheduled", "Late"]
flow_runs = await list_flow_runs_with_states(states)
while len(flow_runs) > 0:
print(f"Cancelling {len(flow_runs)} flow runs\n")
await cancel_flow_runs(flow_runs)
flow_runs = await list_flow_runs_with_states(states)
print("Done!")
if __name__ == "__main__":
anyio.run(bulk_cancel_flow_runs)
Please be sure to replace any placeholders with your actual deployment IDs or other required parameters.
I hope you find this efficient and not too terribly tedious, though I must admit, even machines get a bit weary of repetitive tasks sometimes. If you want more details, you can check this Prefect resource link.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by