Giacomo Chiarella
10/07/2024, 11:20 AMMarvin
10/07/2024, 11:20 AMget_client
method provided by the Prefect library. This method allows you to interact with the API asynchronously. Here is a general approach you can follow based on Prefect 3.x documentation:
1. Import the necessary modules:
python
import anyio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.objects import StateType
2. Define a function to list flow runs with specified states:
python
async def list_flow_runs_with_states(states: list[str]):
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=states)
)
)
)
return flow_runs
3. Define a function to cancel specified flow runs:
python
async def cancel_flow_runs(flow_runs):
async with get_client() as client:
for idx, flow_run in enumerate(flow_runs):
print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
state_updates = {}
state_updates.setdefault("name", "Cancelled")
state_updates.setdefault("type", StateType.CANCELLED)
state = flow_run.state.copy(update=state_updates)
await client.set_flow_run_state(flow_run.id, state, force=True)
4. Implement a main function to handle the cancellation process:
python
async def bulk_cancel_flow_runs():
states = ["Pending", "Running", "Scheduled", "Late"]
flow_runs = await list_flow_runs_with_states(states)
while len(flow_runs) > 0:
print(f"Cancelling {len(flow_runs)} flow runs\n")
await cancel_flow_runs(flow_runs)
flow_runs = await list_flow_runs_with_states(states)
print("Done!")
5. Run the cancellation process:
python
if __name__ == "__main__":
anyio.run(bulk_cancel_flow_runs)
This code will cancel all flow runs that are in "Pending", "Running", "Scheduled", or "Late" states when executed.
For more details, you can refer to the Prefect documentation:
- Prefect 2.x Guide on Using Client: https://docs-2.prefect.io/latest/guides/using-the-client/
- Prefect 3.x Guide on Interacting with API: https://docs-3.prefect.io/3.0/manage/interact-with-api
If you're using version 2.x, the process is quite similar; however, ensure you're adapting any version-specific changes if needed.