<@ULVA73B9P> I am trying the script from this page...
# marvin-ai
m
@Marvin I am trying the script from this page https://docs.prefect.io/v3/advanced/api-client
Copy code
import asyncio

from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.objects import FlowRun, StateType

async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]:
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=states)
                )
            )
        )


async def cancel_flow_runs(flow_runs: list[FlowRun]):
    async with get_client() as client:
        for idx, flow_run in enumerate(flow_runs):
            print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
            state_updates: dict[str, str] = {}
            state_updates.setdefault("name", "Cancelled")
            state_updates.setdefault("type", StateType.CANCELLED)
            state = flow_run.state.copy(update=state_updates)
            await client.set_flow_run_state(flow_run.id, state, force=True)


async def bulk_cancel_flow_runs():
    states = ["Pending", "Running", "Scheduled", "Late"]
    flow_runs = await list_flow_runs_with_states(states)

    while len(flow_runs) > 0:
        print(f"Cancelling {len(flow_runs)} flow runs\n")
        await cancel_flow_runs(flow_runs)
        flow_runs = await list_flow_runs_with_states(states)
    print("Done!")


if __name__ == "__main__":
    asyncio.run(bulk_cancel_flow_runs())
To cancel a bunch of flows, but I get a pydantic error TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer' Which version of pydantic do I need for this to work?
m
thought for 417.7 seconds
Short answer: Prefect 3.x requires Pydantic 2.x. That error usually means your installed
pydantic
and
pydantic-core
are mismatched. You don’t need a specific sub-version, but you do need a compatible pair. What to do: 1) Upgrade Pydantic (and let it pull the matching pydantic-core). Avoid pinning
pydantic-core
separately.
Copy code
pip install -U "prefect>=3" "pydantic>=2,<3"
2) Verify versions are aligned:
Copy code
python -c "import pydantic, pydantic_core; print('pydantic:', pydantic.__version__, 'pydantic-core:', pydantic_core.__version__)"
3) If you previously pinned
pydantic-core
, unpin it and reinstall Pydantic so it can install the correct core version:
Copy code
pip uninstall -y pydantic-core
pip install -U "pydantic>=2,<3"
If you still see the error after that, please share: -
prefect version
- The output of the version check above - Your Python version - The full stack trace For reference, the example you’re following is here: Prefect 3.x API client