Paweł Biernat
06/19/2024, 12:03 PMMarvin
06/19/2024, 12:03 PMpython
import anyio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
async def cancel_running_flow_runs():
async with get_client() as client:
# Get all flow runs with state 'Running'
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Running"])
)
)
)
# Cancel each running flow run
for flow_run in flow_runs:
await client.set_flow_run_state(
flow_run.id, state_type="Cancelled"
)
anyio.run(cancel_running_flow_runs)
### Using the CLI
To cancel all running flow runs via CLI, you can use a loop in your shell script to target each running flow individually since there's no direct command to cancel all runs at once.
Here’s a bash script example:
bash
# Get all running flow runs
flow_run_ids=$(prefect flow-run ls -q --state Running --json | jq -r '.[].id')
# Cancel each running flow run
for id in $flow_run_ids; do
prefect flow-run cancel $id
done
### Useful Links:
- Prefect Python SDK Guide - Detailed guide on using the client.
- Prefect Flow Concepts - Information on flow execution and states.
This should help you efficiently cancel all running flow runs.
If you encounter any issues, please report them on GitHub.Marvin
06/19/2024, 12:05 PMSetState
to correctly change the state of the flow run. Here's the corrected code:
### Corrected Python SDK Code
python
import anyio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.states import Cancelled
async def cancel_running_flow_runs():
async with get_client() as client:
# Get all flow runs with state 'Running'
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Running"])
)
)
)
# Cancel each running flow run
for flow_run in flow_runs:
await client.set_flow_run_state(
flow_run.id, Cancelled()
)
anyio.run(cancel_running_flow_runs)
### Useful Links:
- Prefect Python SDK Guide - Detailed guide on using the client.
- Prefect Flow Concepts - Information on flow execution and states.
This should resolve the issue you're encountering.
If you encounter any issues, please report them on GitHub.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by