Robert Banick
11/08/2024, 3:41 PMimport asyncio
from prefect.states import State
from prefect.client import get_client
MAX_RUNS_TO_DELETE = 5000
PREFECT_OFFSET_MAX = 200
async def remove_all_flows():
client = get_client()
for i in range(0, MAX_RUNS_TO_DELETE, PREFECT_OFFSET_MAX):
flow_runs = await client.read_flow_runs(offset=i)
for flow_run in flow_runs:
if flow_run.state_name == "Late":
flow_id = flow_run.id
print("deleting", flow_id)
await client.set_flow_run_state(flow_run_id=flow_id, state=State(type="CANCELLED"))
asyncio.run(remove_all_flows())
But now returns an error
pydantic.errors.PydanticUserError: `StateCreate` is not fully defined; you should define all referenced types, then call `StateCreate.model_rebuild()`.
No amount of defining State parameters (or passing directly a StateCreate object) fixes the issue. I'm increasingly convinced the old code is now a dead end.
Could the Prefect team advise on what the idiomatic way to cancel a Late flow run would be under Prefect 3.1+?Chris White
Robert Banick
11/08/2024, 5:59 PMRobert Banick
11/08/2024, 6:00 PMawait client.create_flow_run_from_deployment(deployment_id=deployment_id, parameters=parameters)
Where deployment_id
is a string ID and parameters
is a dict of parameters in various formatsRobert Banick
11/08/2024, 6:00 PMChris White
from prefect import flow
it doesn't really matter what you import from here but importing from the main module ensures that whatever is happening is avoided. It'll take some more time to find a proper solution but hopefully this unblocks you for now!Chris White
State
object directly from here and it works:
from prefect import State
Robert Banick
11/08/2024, 8:25 PMYaar Hahn
11/20/2024, 10:11 AMCaleb
12/16/2024, 10:54 PMChris White