Tom Han
10/02/2025, 10:18 PMMarvin
10/02/2025, 10:18 PMMarvin
10/02/2025, 10:20 PMfrom prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateName,
)
from prefect.states import Cancelled, to_state_create
def force_cancel_cancelling_runs(page_size: int = 200) -> None:
with SyncPrefectClient() as client:
offset = 0
while True:
runs = client.read_flow_runs(
flow_runs=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Cancelling"])
)
),
limit=page_size,
offset=offset,
)
if not runs:
break
for run in runs:
try:
client.set_flow_run_state(
run.id,
state=to_state_create(Cancelled()),
force=True,
)
print(f"Set {run.id} to Cancelled")
except Exception as e:
print(f"Failed to set {run.id} to Cancelled: {e}")
offset += len(runs)
if __name__ == "__main__":
force_cancel_cancelling_runs()
References:
- Client (SyncPrefectClient)
- Filters (FlowRunFilter, FlowRunFilterStateName)
- Cancelling/Cancelled states overview
If you actually need this for task runs instead, or prefer an async example, let me know and I’ll adapt it.Tom Han
10/02/2025, 10:21 PMTom Han
10/02/2025, 10:22 PMIn [1]: import prefect.main
^[[A
In [2]: import asyncio
...: from prefect.client.orchestration import get_client
...: from prefect.states import Cancelled
...:
...: async def force_cancel(flow_run_id: str):
...: async with get_client() as client:
...: await client.set_flow_run_state(flow_run_id, state=Cancelled(), force=True)
...:
In [3]: from prefect.client.orchestration import SyncPrefectClient
...: from prefect.client.schemas.filters import (
...: FlowRunFilter,
...: FlowRunFilterState,
...: FlowRunFilterStateName,
...: )
...: from prefect.states import Cancelled, to_state_create
...:
...:
...: def force_cancel_cancelling_runs(page_size: int = 200) -> None:
...: with SyncPrefectClient() as client:
...: offset = 0
...: while True:
...: runs = client.read_flow_runs(
...: flow_runs=FlowRunFilter(
...: state=FlowRunFilterState(
...: name=FlowRunFilterStateName(any_=["Cancelling"])
...: )
...: ),
...: limit=page_size,
...: offset=offset,
...: )
...: if not runs:
...: break
...:
...: for run in runs:
...: try:
...: client.set_flow_run_state(
...: run.id,
...: state=to_state_create(Cancelled()),
...: force=True,
...: )
...: print(f"Set {run.id} to Cancelled")
...: except Exception as e:
...: print(f"Failed to set {run.id} to Cancelled: {e}")
...:
...: offset += len(runs)
...:
In [4]: force_cancel_cancelling_runs()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[4], line 1
----> 1 force_cancel_cancelling_runs()
Cell In[3], line 11, in force_cancel_cancelling_runs(page_size)
10 def force_cancel_cancelling_runs(page_size: int = 200) -> None:
---> 11 with SyncPrefectClient() as client:
12 offset = 0
13 while True:
TypeError: SyncPrefectClient.__init__() missing 1 required positional argument: 'api'
In [5]: from prefect.client.orchestration import SyncPrefectClient
...: from prefect.client.schemas.filters import (
...: FlowRunFilter,
...: FlowRunFilterState,
...: FlowRunFilterStateName,
...: )
...: from prefect.states import Cancelled, to_state_create
...:
...:
...: def force_cancel_cancelling_runs(page_size: int = 200) -> None:
...: with get_client() as client:
...: offset = 0
...: while True:
...: runs = client.read_flow_runs(
...: flow_runs=FlowRunFilter(
...: state=FlowRunFilterState(
...: name=FlowRunFilterStateName(any_=["Cancelling"])
...: )
...: ),
...: limit=page_size,
...: offset=offset,
...: )
...: if not runs:
...: break
...:
...: for run in runs:
...: try:
...: client.set_flow_run_state(
...: run.id,
...: state=to_state_create(Cancelled()),
...: force=True,
...: )
...: print(f"Set {run.id} to Cancelled")
...: except Exception as e:
...: print(f"Failed to set {run.id} to Cancelled: {e}")
...:
...: offset += len(runs)
...:
In [6]: force_cancel_cancelling_runs()
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[6], line 1
----> 1 force_cancel_cancelling_runs()
Cell In[5], line 11, in force_cancel_cancelling_runs(page_size)
10 def force_cancel_cancelling_runs(page_size: int = 200) -> None:
---> 11 with get_client() as client:
12 offset = 0
13 while True:
File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/site-packages/prefect/client/orchestration/__init__.py:1119, in PrefectClient.__enter__(self)
1118 def __enter__(self) -> NoReturn:
-> 1119 raise RuntimeError(
1120 "The `PrefectClient` must be entered with an async context. Use 'async "
1121 "with PrefectClient(...)' not 'with PrefectClient(...)'"
1122 )
RuntimeError: The `PrefectClient` must be entered with an async context. Use 'async with PrefectClient(...)' not 'with PrefectClient(...)'Marvin
10/02/2025, 10:22 PMMarvin
10/02/2025, 10:23 PMSyncPrefectClient requires an API URL; pass api=...
- get_client() returns an async client; use async with and await
Here are working snippets.
Async (recommended)
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateName,
)
from prefect.states import Cancelled, to_state_create
async def force_cancel_cancelling_runs(page_size: int = 200) -> None:
async with get_client() as client:
offset = 0
while True:
runs = await client.read_flow_runs(
flow_runs=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Cancelling"])
)
),
limit=page_size,
offset=offset,
)
if not runs:
break
for run in runs:
try:
await client.set_flow_run_state(
run.id,
state=to_state_create(Cancelled()),
force=True,
)
print(f"Set {run.id} to Cancelled")
except Exception as e:
print(f"Failed to set {run.id} to Cancelled: {e}")
offset += len(runs)
# In a script:
if __name__ == "__main__":
asyncio.run(force_cancel_cancelling_runs())
# In IPython/Jupyter:
# await force_cancel_cancelling_runs()
Sync client (if you prefer)
import os
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateName,
)
from prefect.states import Cancelled, to_state_create
def force_cancel_cancelling_runs_sync(page_size: int = 200, api: str | None = None) -> None:
api = api or os.environ.get("PREFECT_API_URL")
if not api:
raise ValueError("Set PREFECT_API_URL or pass api=... to SyncPrefectClient")
with SyncPrefectClient(api=api) as client:
offset = 0
while True:
runs = client.read_flow_runs(
flow_runs=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Cancelling"])
)
),
limit=page_size,
offset=offset,
)
if not runs:
break
for run in runs:
try:
client.set_flow_run_state(
run.id,
state=to_state_create(Cancelled()),
force=True,
)
print(f"Set {run.id} to Cancelled")
except Exception as e:
print(f"Failed to set {run.id} to Cancelled: {e}")
offset += len(runs)
# force_cancel_cancelling_runs_sync()
Docs for reference:
- Client (orchestration)
- Filters (FlowRunFilter, FlowRunFilterStateName)
- Cancelling/Cancelled statesTom Han
10/02/2025, 10:26 PMIn [7]: import asyncio
...: from prefect.client.orchestration import get_client
...: from prefect.client.schemas.filters import (
...: FlowRunFilter,
...: FlowRunFilterState,
...: FlowRunFilterStateName,
...: )
...: from prefect.states import Cancelled, to_state_create
...:
...: async def force_cancel_cancelling_runs(page_size: int = 200) -> None:
...: async with get_client() as client:
...: offset = 0
...: while True:
...: runs = await client.read_flow_runs(
...: flow_runs=FlowRunFilter(
...: state=FlowRunFilterState(
...: name=FlowRunFilterStateName(any_=["Cancelling"])
...: )
...: ),
...: limit=page_size,
...: offset=offset,
...: )
...: if not runs:
...: break
...:
...: for run in runs:
...: try:
...: await client.set_flow_run_state(
...: run.id,
...: state=to_state_create(Cancelled()),
...: force=True,
...: )
...: print(f"Set {run.id} to Cancelled")
...: except Exception as e:
...: print(f"Failed to set {run.id} to Cancelled: {e}")
...:
...: offset += len(runs)
...:
In [8]: asyncio.run(force_cancel_cancelling_runs())
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[8], line 1
----> 1 asyncio.run(force_cancel_cancelling_runs())
File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/runners.py:195, in run(main, debug, loop_factory)
191 raise RuntimeError(
192 "asyncio.run() cannot be called from a running event loop")
194 with Runner(debug=debug, loop_factory=loop_factory) as runner:
--> 195 return runner.run(main)
File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/runners.py:118, in Runner.run(self, coro, context)
116 self._interrupt_count = 0
117 try:
--> 118 return self._loop.run_until_complete(task)
119 except exceptions.CancelledError:
120 if self._interrupt_count > 0:
File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/base_events.py:691, in BaseEventLoop.run_until_complete(self, future)
688 if not future.done():
689 raise RuntimeError('Event loop stopped before Future completed.')
--> 691 return future.result()
Cell In[7], line 14, in force_cancel_cancelling_runs(page_size)
12 offset = 0
13 while True:
---> 14 runs = await client.read_flow_runs(
15 flow_runs=FlowRunFilter(
16 state=FlowRunFilterState(
17 name=FlowRunFilterStateName(any_=["Cancelling"])
18 )
19 ),
20 limit=page_size,
21 offset=offset,
22 )
23 if not runs:
24 break
TypeError: FlowRunAsyncClient.read_flow_runs() got an unexpected keyword argument 'flow_runs'
@MarvinMarvin
10/02/2025, 10:26 PMMarvin
10/02/2025, 10:27 PMflow_run_filter (not flow_runs). Also, set_flow_run_state accepts a State directly, so you don’t need to_state_create.
Here’s a fixed async version:
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateName,
)
from prefect.states import Cancelled
async def force_cancel_cancelling_runs(page_size: int = 200) -> None:
async with get_client() as client:
offset = 0
while True:
runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Cancelling"])
)
),
limit=page_size,
offset=offset,
)
if not runs:
break
for run in runs:
try:
await client.set_flow_run_state(
run.id,
state=Cancelled(),
force=True,
)
print(f"Set {run.id} to Cancelled")
except Exception as e:
print(f"Failed to set {run.id} to Cancelled: {e}")
offset += len(runs)
# In a script:
# asyncio.run(force_cancel_cancelling_runs())
# In IPython/Jupyter, use:
# await force_cancel_cancelling_runs()
If you prefer sync, remember SyncPrefectClient needs an API URL:
import os
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateName,
)
from prefect.states import Cancelled
def force_cancel_cancelling_runs_sync(page_size: int = 200, api: str | None = None) -> None:
api = api or os.environ.get("PREFECT_API_URL")
if not api:
raise ValueError("Set PREFECT_API_URL or pass api=... to SyncPrefectClient")
with SyncPrefectClient(api=api) as client:
offset = 0
while True:
runs = client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Cancelling"])
)
),
limit=page_size,
offset=offset,
)
if not runs:
break
for run in runs:
try:
client.set_flow_run_state(
run.id,
state=Cancelled(),
force=True,
)
print(f"Set {run.id} to Cancelled")
except Exception as e:
print(f"Failed to set {run.id} to Cancelled: {e}")
offset += len(runs)
Refs:
- Client (orchestration)
- Filters (FlowRunFilter, FlowRunFilterStateName)
- Cancelling/Cancelled states