<@ULVA73B9P> Give me a code snippet that get all f...
# ask-marvin
t
@Marvin Give me a code snippet that get all flows currently in cancelling state and set it to Cancelled
m
thought for 164.4 seconds
Assuming you mean flow runs (flows themselves don’t have states). Here’s a complete Prefect 3.x snippet that finds all flow runs in the Cancelling state and force-sets them to Cancelled:
Copy code
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateName,
)
from prefect.states import Cancelled, to_state_create


def force_cancel_cancelling_runs(page_size: int = 200) -> None:
    with SyncPrefectClient() as client:
        offset = 0
        while True:
            runs = client.read_flow_runs(
                flow_runs=FlowRunFilter(
                    state=FlowRunFilterState(
                        name=FlowRunFilterStateName(any_=["Cancelling"])
                    )
                ),
                limit=page_size,
                offset=offset,
            )
            if not runs:
                break

            for run in runs:
                try:
                    client.set_flow_run_state(
                        run.id,
                        state=to_state_create(Cancelled()),
                        force=True,
                    )
                    print(f"Set {run.id} to Cancelled")
                except Exception as e:
                    print(f"Failed to set {run.id} to Cancelled: {e}")

            offset += len(runs)


if __name__ == "__main__":
    force_cancel_cancelling_runs()
References: - Client (SyncPrefectClient) - Filters (FlowRunFilter, FlowRunFilterStateName) - Cancelling/Cancelled states overview If you actually need this for task runs instead, or prefer an async example, let me know and I’ll adapt it.
t
orce_cancel_cancelling_runs() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[4], line 1 ----> 1 force_cancel_cancelling_runs() Cell In[3], line 11, in force_cancel_cancelling_runs(page_size) 10 def force_cancel_cancelling_runs(page_size: int = 200) -> None: ---> 11 with SyncPrefectClient() as client: 12 offset = 0 13 while True: TypeError: SyncPrefectClient.__init__() missing 1 required positional argument: 'api'
@Marvin
Copy code
In [1]: import prefect.main
^[[A
In [2]: import asyncio
   ...: from prefect.client.orchestration import get_client
   ...: from prefect.states import Cancelled
   ...:
   ...: async def force_cancel(flow_run_id: str):
   ...:     async with get_client() as client:
   ...:         await client.set_flow_run_state(flow_run_id, state=Cancelled(), force=True)
   ...:

In [3]: from prefect.client.orchestration import SyncPrefectClient
   ...: from prefect.client.schemas.filters import (
   ...:     FlowRunFilter,
   ...:     FlowRunFilterState,
   ...:     FlowRunFilterStateName,
   ...: )
   ...: from prefect.states import Cancelled, to_state_create
   ...:
   ...:
   ...: def force_cancel_cancelling_runs(page_size: int = 200) -> None:
   ...:     with SyncPrefectClient() as client:
   ...:         offset = 0
   ...:         while True:
   ...:             runs = client.read_flow_runs(
   ...:                 flow_runs=FlowRunFilter(
   ...:                     state=FlowRunFilterState(
   ...:                         name=FlowRunFilterStateName(any_=["Cancelling"])
   ...:                     )
   ...:                 ),
   ...:                 limit=page_size,
   ...:                 offset=offset,
   ...:             )
   ...:             if not runs:
   ...:                 break
   ...:
   ...:             for run in runs:
   ...:                 try:
   ...:                     client.set_flow_run_state(
   ...:                         run.id,
   ...:                         state=to_state_create(Cancelled()),
   ...:                         force=True,
   ...:                     )
   ...:                     print(f"Set {run.id} to Cancelled")
   ...:                 except Exception as e:
   ...:                     print(f"Failed to set {run.id} to Cancelled: {e}")
   ...:
   ...:             offset += len(runs)
   ...:

In [4]: force_cancel_cancelling_runs()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[4], line 1
----> 1 force_cancel_cancelling_runs()

Cell In[3], line 11, in force_cancel_cancelling_runs(page_size)
     10 def force_cancel_cancelling_runs(page_size: int = 200) -> None:
---> 11     with SyncPrefectClient() as client:
     12         offset = 0
     13         while True:

TypeError: SyncPrefectClient.__init__() missing 1 required positional argument: 'api'

In [5]: from prefect.client.orchestration import SyncPrefectClient
   ...: from prefect.client.schemas.filters import (
   ...:     FlowRunFilter,
   ...:     FlowRunFilterState,
   ...:     FlowRunFilterStateName,
   ...: )
   ...: from prefect.states import Cancelled, to_state_create
   ...:
   ...:
   ...: def force_cancel_cancelling_runs(page_size: int = 200) -> None:
   ...:     with get_client() as client:
   ...:         offset = 0
   ...:         while True:
   ...:             runs = client.read_flow_runs(
   ...:                 flow_runs=FlowRunFilter(
   ...:                     state=FlowRunFilterState(
   ...:                         name=FlowRunFilterStateName(any_=["Cancelling"])
   ...:                     )
   ...:                 ),
   ...:                 limit=page_size,
   ...:                 offset=offset,
   ...:             )
   ...:             if not runs:
   ...:                 break
   ...:
   ...:             for run in runs:
   ...:                 try:
   ...:                     client.set_flow_run_state(
   ...:                         run.id,
   ...:                         state=to_state_create(Cancelled()),
   ...:                         force=True,
   ...:                     )
   ...:                     print(f"Set {run.id} to Cancelled")
   ...:                 except Exception as e:
   ...:                     print(f"Failed to set {run.id} to Cancelled: {e}")
   ...:
   ...:             offset += len(runs)
   ...:

In [6]: force_cancel_cancelling_runs()
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[6], line 1
----> 1 force_cancel_cancelling_runs()

Cell In[5], line 11, in force_cancel_cancelling_runs(page_size)
     10 def force_cancel_cancelling_runs(page_size: int = 200) -> None:
---> 11     with get_client() as client:
     12         offset = 0
     13         while True:

File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/site-packages/prefect/client/orchestration/__init__.py:1119, in PrefectClient.__enter__(self)
   1118 def __enter__(self) -> NoReturn:
-> 1119     raise RuntimeError(
   1120         "The `PrefectClient` must be entered with an async context. Use 'async "
   1121         "with PrefectClient(...)' not 'with PrefectClient(...)'"
   1122     )

RuntimeError: The `PrefectClient` must be entered with an async context. Use 'async with PrefectClient(...)' not 'with PrefectClient(...)'
m
thought for 52.6 seconds
Thanks for the traceback — two fixes: -
SyncPrefectClient
requires an API URL; pass
api=...
-
get_client()
returns an async client; use
async with
and
await
Here are working snippets. Async (recommended)
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateName,
)
from prefect.states import Cancelled, to_state_create

async def force_cancel_cancelling_runs(page_size: int = 200) -> None:
    async with get_client() as client:
        offset = 0
        while True:
            runs = await client.read_flow_runs(
                flow_runs=FlowRunFilter(
                    state=FlowRunFilterState(
                        name=FlowRunFilterStateName(any_=["Cancelling"])
                    )
                ),
                limit=page_size,
                offset=offset,
            )
            if not runs:
                break

            for run in runs:
                try:
                    await client.set_flow_run_state(
                        run.id,
                        state=to_state_create(Cancelled()),
                        force=True,
                    )
                    print(f"Set {run.id} to Cancelled")
                except Exception as e:
                    print(f"Failed to set {run.id} to Cancelled: {e}")

            offset += len(runs)

# In a script:
if __name__ == "__main__":
    asyncio.run(force_cancel_cancelling_runs())

# In IPython/Jupyter:
# await force_cancel_cancelling_runs()
Sync client (if you prefer)
Copy code
import os
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateName,
)
from prefect.states import Cancelled, to_state_create

def force_cancel_cancelling_runs_sync(page_size: int = 200, api: str | None = None) -> None:
    api = api or os.environ.get("PREFECT_API_URL")
    if not api:
        raise ValueError("Set PREFECT_API_URL or pass api=... to SyncPrefectClient")
    with SyncPrefectClient(api=api) as client:
        offset = 0
        while True:
            runs = client.read_flow_runs(
                flow_runs=FlowRunFilter(
                    state=FlowRunFilterState(
                        name=FlowRunFilterStateName(any_=["Cancelling"])
                    )
                ),
                limit=page_size,
                offset=offset,
            )
            if not runs:
                break

            for run in runs:
                try:
                    client.set_flow_run_state(
                        run.id,
                        state=to_state_create(Cancelled()),
                        force=True,
                    )
                    print(f"Set {run.id} to Cancelled")
                except Exception as e:
                    print(f"Failed to set {run.id} to Cancelled: {e}")

            offset += len(runs)

# force_cancel_cancelling_runs_sync()
Docs for reference: - Client (orchestration) - Filters (FlowRunFilter, FlowRunFilterStateName) - Cancelling/Cancelled states
t
Copy code
In [7]: import asyncio
   ...: from prefect.client.orchestration import get_client
   ...: from prefect.client.schemas.filters import (
   ...:     FlowRunFilter,
   ...:     FlowRunFilterState,
   ...:     FlowRunFilterStateName,
   ...: )
   ...: from prefect.states import Cancelled, to_state_create
   ...:
   ...: async def force_cancel_cancelling_runs(page_size: int = 200) -> None:
   ...:     async with get_client() as client:
   ...:         offset = 0
   ...:         while True:
   ...:             runs = await client.read_flow_runs(
   ...:                 flow_runs=FlowRunFilter(
   ...:                     state=FlowRunFilterState(
   ...:                         name=FlowRunFilterStateName(any_=["Cancelling"])
   ...:                     )
   ...:                 ),
   ...:                 limit=page_size,
   ...:                 offset=offset,
   ...:             )
   ...:             if not runs:
   ...:                 break
   ...:
   ...:             for run in runs:
   ...:                 try:
   ...:                     await client.set_flow_run_state(
   ...:                         run.id,
   ...:                         state=to_state_create(Cancelled()),
   ...:                         force=True,
   ...:                     )
   ...:                     print(f"Set {run.id} to Cancelled")
   ...:                 except Exception as e:
   ...:                     print(f"Failed to set {run.id} to Cancelled: {e}")
   ...:
   ...:             offset += len(runs)
   ...:

In [8]: asyncio.run(force_cancel_cancelling_runs())
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 asyncio.run(force_cancel_cancelling_runs())

File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/runners.py:195, in run(main, debug, loop_factory)
    191     raise RuntimeError(
    192         "asyncio.run() cannot be called from a running event loop")
    194 with Runner(debug=debug, loop_factory=loop_factory) as runner:
--> 195     return runner.run(main)

File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/runners.py:118, in Runner.run(self, coro, context)
    116 self._interrupt_count = 0
    117 try:
--> 118     return self._loop.run_until_complete(task)
    119 except exceptions.CancelledError:
    120     if self._interrupt_count > 0:

File /opt/homebrew/Caskroom/miniforge/base/envs/prefect-test/lib/python3.12/asyncio/base_events.py:691, in BaseEventLoop.run_until_complete(self, future)
    688 if not future.done():
    689     raise RuntimeError('Event loop stopped before Future completed.')
--> 691 return future.result()

Cell In[7], line 14, in force_cancel_cancelling_runs(page_size)
     12 offset = 0
     13 while True:
---> 14     runs = await client.read_flow_runs(
     15         flow_runs=FlowRunFilter(
     16             state=FlowRunFilterState(
     17                 name=FlowRunFilterStateName(any_=["Cancelling"])
     18             )
     19         ),
     20         limit=page_size,
     21         offset=offset,
     22     )
     23     if not runs:
     24         break

TypeError: FlowRunAsyncClient.read_flow_runs() got an unexpected keyword argument 'flow_runs'
@Marvin
m
thought for 60.7 seconds
Good catch — that error is because the parameter name changed in Prefect 3.x. It’s
flow_run_filter
(not
flow_runs
). Also,
set_flow_run_state
accepts a
State
directly, so you don’t need
to_state_create
. Here’s a fixed async version:
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateName,
)
from prefect.states import Cancelled

async def force_cancel_cancelling_runs(page_size: int = 200) -> None:
    async with get_client() as client:
        offset = 0
        while True:
            runs = await client.read_flow_runs(
                flow_run_filter=FlowRunFilter(
                    state=FlowRunFilterState(
                        name=FlowRunFilterStateName(any_=["Cancelling"])
                    )
                ),
                limit=page_size,
                offset=offset,
            )
            if not runs:
                break

            for run in runs:
                try:
                    await client.set_flow_run_state(
                        run.id,
                        state=Cancelled(),
                        force=True,
                    )
                    print(f"Set {run.id} to Cancelled")
                except Exception as e:
                    print(f"Failed to set {run.id} to Cancelled: {e}")

            offset += len(runs)

# In a script:
# asyncio.run(force_cancel_cancelling_runs())

# In IPython/Jupyter, use:
# await force_cancel_cancelling_runs()
If you prefer sync, remember
SyncPrefectClient
needs an API URL:
Copy code
import os
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateName,
)
from prefect.states import Cancelled

def force_cancel_cancelling_runs_sync(page_size: int = 200, api: str | None = None) -> None:
    api = api or os.environ.get("PREFECT_API_URL")
    if not api:
        raise ValueError("Set PREFECT_API_URL or pass api=... to SyncPrefectClient")

    with SyncPrefectClient(api=api) as client:
        offset = 0
        while True:
            runs = client.read_flow_runs(
                flow_run_filter=FlowRunFilter(
                    state=FlowRunFilterState(
                        name=FlowRunFilterStateName(any_=["Cancelling"])
                    )
                ),
                limit=page_size,
                offset=offset,
            )
            if not runs:
                break

            for run in runs:
                try:
                    client.set_flow_run_state(
                        run.id,
                        state=Cancelled(),
                        force=True,
                    )
                    print(f"Set {run.id} to Cancelled")
                except Exception as e:
                    print(f"Failed to set {run.id} to Cancelled: {e}")

            offset += len(runs)
Refs: - Client (orchestration) - Filters (FlowRunFilter, FlowRunFilterStateName) - Cancelling/Cancelled states