Charles Liu
09/26/2024, 4:26 AMMarvin
09/26/2024, 4:26 AMpython
from prefect.client import OrionClient
from prefect.orion.schemas.states import StateType
async def delete_running_or_retrying_runs():
async with OrionClient() as client:
# Querying for task runs in 'RUNNING' or 'RETRYING' state
task_runs = await client.read_task_runs(
state_types=[StateType.RUNNING, StateType.RETRYING]
)
for task_run in task_runs:
await client._client.delete(f"/task_runs/{task_run.id}")
# Querying for flow runs in 'RUNNING' or 'RETRYING' state
flow_runs = await client.read_flow_runs(
state_types=[StateType.RUNNING, StateType.RETRYING]
)
for flow_run in flow_runs:
await client._client.delete(f"/flow_runs/{flow_run.id}")
import asyncio
asyncio.run(delete_running_or_retrying_runs())
Steps Explained:
1. Import necessary classes and enums from prefect.client
and prefect.orion.schemas.states
.
2. Create an asynchronous function delete_running_or_retrying_runs
.
3. Use OrionClient
to connect to the Prefect API.
4. Query for task runs and flow runs that are in RUNNING
or RETRYING
state.
5. Delete each of the retrieved runs using the client's delete method.
6. Run the asynchronous function using asyncio.run
.
Make sure to replace "/task_runs/{task_run.id}"
and "/flow_runs/{flow_run.id}"
with the appropriate endpoint if they differ in your Prefect setup.
You can refer to Prefect's state management documentation for more details on handling states. For more on using Prefect's client, you can check the latest API documentation.
Feel free to ask if you need further assistance!