Alan
10/16/2024, 6:23 PMfrom prefect import get_client
from prefect.server.schemas.states import State, StateType
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName, FlowRunFilterStateType
import asyncio
async def get_cancelling_flows():
async with get_client() as client:
state_filter = FlowRunFilter(state=FlowRunFilterState(type=FlowRunFilterStateType(any_=['CANCELLED'])))
# Query all flow runs
flow_runs = await client.read_flow_runs(flow_run_filter=state_filter)
cancelled_state = State(type=StateType.CANCELLED)
# Loop through each flow run and change its state to "CANCELLED"
for flow in flow_runs:
try:
await client.set_flow_run_state(
flow_run_id=flow.id,
state=cancelled_state,
force=True
)
print(f"Flow run {flow.id} has been set to CANCELLED.")
except Exception as e:
print(e)
# Run the async function to get the cancelling flows
asyncio.run(get_cancelling_flows())
Marvin
10/16/2024, 6:23 PMpython
import prefect
print(prefect.__version__)
Once I have that information, I can provide the most accurate guidance and search for relevant documentation and issues that might help resolve the problem with flow runs getting stuck in a "cancelling" state.Alan
10/16/2024, 6:23 PMMarvin
10/16/2024, 6:24 PM