Matthew Scanlon
03/10/2025, 5:43 PMMarvin
03/10/2025, 5:44 PMPrefectClient
from the prefect.client.orchestration
module. Here's a concise example of how you might write a script to achieve this:
python
from prefect.client.orchestration import PrefectClient
from prefect.states import State
async def cancel_flow_runs_in_states(states_to_cancel):
async with PrefectClient() as client:
# Fetch all flow runs
flow_runs = await client.read_flow_runs()
for flow_run in flow_runs:
if flow_run.state.name in states_to_cancel:
await client.set_flow_run_state(flow_run.id, state=State.Cancelling)
# Usage
import asyncio
states_to_cancel = ["Running", "Scheduled"] # Example states
asyncio.run(cancel_flow_runs_in_states(states_to_cancel))
This script connects to the Prefect API, retrieves all flow runs, and cancels those that are in the specified states. You can modify the states_to_cancel
list to include the states you are interested in.
For more details, you can refer to the Prefect documentation.Matthew Scanlon
03/10/2025, 5:48 PMTraceback (most recent call last):
File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/cancel_all_scheduled.py", line 20, in <module>
asyncio.run(cancel_flow_runs_in_states(states_to_cancel))
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/cancel_all_scheduled.py", line 8, in cancel_flow_runs_in_states
flow_runs = await client.read_flow_runs()
File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2120, in read_flow_runs
return pydantic.TypeAdapter(List[FlowRun]).validate_python(response.json())
File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/.venv/lib/python3.10/site-packages/pydantic/type_adapter.py", line 412, in validate_python
return self.validator.validate_python(
File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/.venv/lib/python3.10/site-packages/pydantic/_internal/_mock_val_ser.py", line 100, in __getattr__
raise PydanticUserError(self._error_message, code=self._code)
pydantic.errors.PydanticUserError: `TypeAdapter[typing.List[prefect.client.schemas.objects.FlowRun]]` is not fully defined; you should define `typing.List[prefect.client.schemas.objects.FlowRun]` and all referenced types, then call `.rebuild()` on the instance.
Marvin
03/10/2025, 5:48 PMPydanticUserError
and TypeAdapter
appears to be related to a known issue with import order dependencies in Prefect. The issue can cause problems with type validation using Pydantic when certain imports are not in place.
Here's a potential solution based on the issue details:
1. Ensure Proper Import Order: Try importing something directly from the main prefect
module before you perform operations with the PrefectClient
. This can help resolve any circular dependency issues.
2. Rebuild Type Definitions: If you encounter a TypeAdapter
error, ensure that all referenced types are fully defined and rebuilt using the .rebuild()
method. However, this might not be directly applicable in your script since it involves internal Prefect schema handling.
Here's an updated version of your script with an additional import from the main prefect
module:
python
from prefect import flow # Import something from the main module
from prefect.client.orchestration import PrefectClient
from prefect.states import State
async def cancel_flow_runs_in_states(states_to_cancel):
async with PrefectClient() as client:
# Fetch all flow runs
flow_runs = await client.read_flow_runs()
for flow_run in flow_runs:
if flow_run.state.name in states_to_cancel:
await client.set_flow_run_state(flow_run.id, state=State.Cancelling)
# Usage
import asyncio
states_to_cancel = ["Running", "Scheduled"] # Example states
asyncio.run(cancel_flow_runs_in_states(states_to_cancel))
If the issue persists, consider checking the GitHub issue thread for any updates or additional workarounds.Matthew Scanlon
03/10/2025, 5:59 PMMarvin
03/10/2025, 6:00 PMinfrastructure_pid
is missing, the flow run may be marked as cancelled, but the cancellation cannot be enforced.
4. Concurrent State Changes: There might be concurrent operations or transitions affecting the flow runs, causing some to remain in their original state.
5. Network or API Issues: Temporary network issues or API inconsistencies might prevent some requests from being processed.
For further insights, you might want to check the specific logs or error messages associated with the flow runs that were not cancelled. Additionally, you can refer to the Prefect cancellation documentation for more details on the cancellation process and potential pitfalls.