Hi all, I'm overhauling our current Prefect 2.19.1 to Prefect 3.1.0. Because our codebase dynamica...
r

Robert Banick

8 months ago
Hi all, I'm overhauling our current Prefect 2.19.1 to Prefect 3.1.0. Because our codebase dynamically generates a lot of deployments we use Prefect Python and not Prefect YAML to manage everything. Surprisingly the Python code has really stayed mostly the same between versions, which is great. Unsurprisingly some of our side scripts we use to manage the Cloud are now broken. One in particular we use to clear backlogs of Late runs is giving me fits. This code used to work
import asyncio

from prefect.states import State
from prefect.client import get_client

MAX_RUNS_TO_DELETE = 5000
PREFECT_OFFSET_MAX = 200


async def remove_all_flows():
    client = get_client()
    for i in range(0, MAX_RUNS_TO_DELETE, PREFECT_OFFSET_MAX):
        flow_runs = await client.read_flow_runs(offset=i)
        for flow_run in flow_runs:
            if flow_run.state_name == "Late":
                flow_id = flow_run.id
                print("deleting", flow_id)
                await client.set_flow_run_state(flow_run_id=flow_id, state=State(type="CANCELLED"))


asyncio.run(remove_all_flows())
But now returns an error
pydantic.errors.PydanticUserError: `StateCreate` is not fully defined; you should define all referenced types, then call `StateCreate.model_rebuild()`.
No amount of defining State parameters (or passing directly a StateCreate object) fixes the issue. I'm increasingly convinced the old code is now a dead end. Could the Prefect team advise on what the idiomatic way to cancel a Late flow run would be under Prefect 3.1+?