<@ULVA73B9P> How can i write a python script to ca...
# ask-marvin
m
@Marvin How can i write a python script to cancel all flow runs that are in some given list of states
m
To cancel flow runs that are in specific states using Prefect 3.x, you can use the
PrefectClient
from the
prefect.client.orchestration
module. Here's a concise example of how you might write a script to achieve this:
Copy code
python
from prefect.client.orchestration import PrefectClient
from prefect.states import State

async def cancel_flow_runs_in_states(states_to_cancel):
    async with PrefectClient() as client:
        # Fetch all flow runs
        flow_runs = await client.read_flow_runs()
        
        for flow_run in flow_runs:
            if flow_run.state.name in states_to_cancel:
                await client.set_flow_run_state(flow_run.id, state=State.Cancelling)

# Usage
import asyncio
states_to_cancel = ["Running", "Scheduled"]  # Example states
asyncio.run(cancel_flow_runs_in_states(states_to_cancel))
This script connects to the Prefect API, retrieves all flow runs, and cancels those that are in the specified states. You can modify the
states_to_cancel
list to include the states you are interested in. For more details, you can refer to the Prefect documentation.
m
@Marvin
Copy code
Traceback (most recent call last):
  File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/cancel_all_scheduled.py", line 20, in <module>
    asyncio.run(cancel_flow_runs_in_states(states_to_cancel))
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/cancel_all_scheduled.py", line 8, in cancel_flow_runs_in_states
    flow_runs = await client.read_flow_runs()
  File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2120, in read_flow_runs
    return pydantic.TypeAdapter(List[FlowRun]).validate_python(response.json())
  File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/.venv/lib/python3.10/site-packages/pydantic/type_adapter.py", line 412, in validate_python
    return self.validator.validate_python(
  File "/Users/matthewscanlon/Documents/Exos_Financial/Github/exos-chronos-flows/.venv/lib/python3.10/site-packages/pydantic/_internal/_mock_val_ser.py", line 100, in __getattr__
    raise PydanticUserError(self._error_message, code=self._code)
pydantic.errors.PydanticUserError: `TypeAdapter[typing.List[prefect.client.schemas.objects.FlowRun]]` is not fully defined; you should define `typing.List[prefect.client.schemas.objects.FlowRun]` and all referenced types, then call `.rebuild()` on the instance.
m
The error you're encountering with
PydanticUserError
and
TypeAdapter
appears to be related to a known issue with import order dependencies in Prefect. The issue can cause problems with type validation using Pydantic when certain imports are not in place. Here's a potential solution based on the issue details: 1. Ensure Proper Import Order: Try importing something directly from the main
prefect
module before you perform operations with the
PrefectClient
. This can help resolve any circular dependency issues. 2. Rebuild Type Definitions: If you encounter a
TypeAdapter
error, ensure that all referenced types are fully defined and rebuilt using the
.rebuild()
method. However, this might not be directly applicable in your script since it involves internal Prefect schema handling. Here's an updated version of your script with an additional import from the main
prefect
module:
Copy code
python
from prefect import flow  # Import something from the main module
from prefect.client.orchestration import PrefectClient
from prefect.states import State

async def cancel_flow_runs_in_states(states_to_cancel):
    async with PrefectClient() as client:
        # Fetch all flow runs
        flow_runs = await client.read_flow_runs()
        
        for flow_run in flow_runs:
            if flow_run.state.name in states_to_cancel:
                await client.set_flow_run_state(flow_run.id, state=State.Cancelling)

# Usage
import asyncio
states_to_cancel = ["Running", "Scheduled"]  # Example states
asyncio.run(cancel_flow_runs_in_states(states_to_cancel))
If the issue persists, consider checking the GitHub issue thread for any updates or additional workarounds.
m
@Marvin one minor change here await client.set_flow_run_state(flow_run.id,state=State(type=StateType.CANCELLING)) Why would this result in most scheudled flows being cancelled, but not all of them?
m
The inconsistency in cancelling all scheduled flow runs might be due to several factors related to infrastructure and state transitions. Here are some potential reasons: 1. Infrastructure Support: Not all infrastructures support cancellation. If a flow run's infrastructure does not support cancellation, the cancellation attempt will not succeed. 2. Identifier Scope Mismatch: If the identifier scope does not match when attempting to cancel a flow run, the worker cannot cancel it. Another worker might need to attempt the cancellation. 3. Missing Infrastructure PID: If the
infrastructure_pid
is missing, the flow run may be marked as cancelled, but the cancellation cannot be enforced. 4. Concurrent State Changes: There might be concurrent operations or transitions affecting the flow runs, causing some to remain in their original state. 5. Network or API Issues: Temporary network issues or API inconsistencies might prevent some requests from being processed. For further insights, you might want to check the specific logs or error messages associated with the flow runs that were not cancelled. Additionally, you can refer to the Prefect cancellation documentation for more details on the cancellation process and potential pitfalls.