Paweł Biernat
06/19/2024, 12:03 PMMarvin
06/19/2024, 12:03 PMpython
import anyio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
async def cancel_running_flow_runs():
async with get_client() as client:
# Get all flow runs with state 'Running'
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Running"])
)
)
)
# Cancel each running flow run
for flow_run in flow_runs:
await client.set_flow_run_state(
flow_run.id, state_type="Cancelled"
)
anyio.run(cancel_running_flow_runs)
### Using the CLI
To cancel all running flow runs via CLI, you can use a loop in your shell script to target each running flow individually since there's no direct command to cancel all runs at once.
Here’s a bash script example:
bash
# Get all running flow runs
flow_run_ids=$(prefect flow-run ls -q --state Running --json | jq -r '.[].id')
# Cancel each running flow run
for id in $flow_run_ids; do
prefect flow-run cancel $id
done
### Useful Links:
- Prefect Python SDK Guide - Detailed guide on using the client.
- Prefect Flow Concepts - Information on flow execution and states.
This should help you efficiently cancel all running flow runs.
If you encounter any issues, please report them on GitHub.Marvin
06/19/2024, 12:05 PMSetState
to correctly change the state of the flow run. Here's the corrected code:
### Corrected Python SDK Code
python
import anyio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.states import Cancelled
async def cancel_running_flow_runs():
async with get_client() as client:
# Get all flow runs with state 'Running'
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Running"])
)
)
)
# Cancel each running flow run
for flow_run in flow_runs:
await client.set_flow_run_state(
flow_run.id, Cancelled()
)
anyio.run(cancel_running_flow_runs)
### Useful Links:
- Prefect Python SDK Guide - Detailed guide on using the client.
- Prefect Flow Concepts - Information on flow execution and states.
This should resolve the issue you're encountering.
If you encounter any issues, please report them on GitHub.