Markella Skempri
10/21/2025, 4:07 PMimport asyncio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.objects import FlowRun, StateType
async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]:
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=states)
                )
            )
        )
async def cancel_flow_runs(flow_runs: list[FlowRun]):
    async with get_client() as client:
        for idx, flow_run in enumerate(flow_runs):
            print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
            state_updates: dict[str, str] = {}
            state_updates.setdefault("name", "Cancelled")
            state_updates.setdefault("type", StateType.CANCELLED)
            state = flow_run.state.copy(update=state_updates)
            await client.set_flow_run_state(flow_run.id, state, force=True)
async def bulk_cancel_flow_runs():
    states = ["Pending", "Running", "Scheduled", "Late"]
    flow_runs = await list_flow_runs_with_states(states)
    while len(flow_runs) > 0:
        print(f"Cancelling {len(flow_runs)} flow runs\n")
        await cancel_flow_runs(flow_runs)
        flow_runs = await list_flow_runs_with_states(states)
    print("Done!")
if __name__ == "__main__":
    asyncio.run(bulk_cancel_flow_runs())
To cancel a bunch of flows, but I get a pydantic error TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
Which version of pydantic do I need for this to work?Marvin
10/21/2025, 4:07 PMMarvin
10/21/2025, 4:14 PMpydantic and pydantic-core are mismatched. You don’t need a specific sub-version, but you do need a compatible pair.
What to do:
1) Upgrade Pydantic (and let it pull the matching pydantic-core). Avoid pinning pydantic-core separately.
pip install -U "prefect>=3" "pydantic>=2,<3"
2) Verify versions are aligned:
python -c "import pydantic, pydantic_core; print('pydantic:', pydantic.__version__, 'pydantic-core:', pydantic_core.__version__)"
3) If you previously pinned pydantic-core, unpin it and reinstall Pydantic so it can install the correct core version:
pip uninstall -y pydantic-core
pip install -U "pydantic>=2,<3"
If you still see the error after that, please share:
- prefect version
- The output of the version check above
- Your Python version
- The full stack trace
For reference, the example you’re following is here: Prefect 3.x API client