<@ULVA73B9P> how can I set a flow run to CANCELLED...
# marvin-in-the-wild
g
@Marvin how can I set a flow run to CANCELLED state using Prefect api?
m
To set a flow run to a "CANCELLED" state using the Prefect API, you can use the
get_client
method provided by the Prefect library. This method allows you to interact with the API asynchronously. Here is a general approach you can follow based on Prefect 3.x documentation: 1. Import the necessary modules:
Copy code
python
import anyio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.objects import StateType
2. Define a function to list flow runs with specified states:
Copy code
python
async def list_flow_runs_with_states(states: list[str]):
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=states)
                )
            )
        )
    return flow_runs
3. Define a function to cancel specified flow runs:
Copy code
python
async def cancel_flow_runs(flow_runs):
    async with get_client() as client:
        for idx, flow_run in enumerate(flow_runs):
            print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
            state_updates = {}
            state_updates.setdefault("name", "Cancelled")
            state_updates.setdefault("type", StateType.CANCELLED)
            state = flow_run.state.copy(update=state_updates)
            await client.set_flow_run_state(flow_run.id, state, force=True)
4. Implement a main function to handle the cancellation process:
Copy code
python
async def bulk_cancel_flow_runs():
    states = ["Pending", "Running", "Scheduled", "Late"]
    flow_runs = await list_flow_runs_with_states(states)
    while len(flow_runs) > 0:
        print(f"Cancelling {len(flow_runs)} flow runs\n")
        await cancel_flow_runs(flow_runs)
        flow_runs = await list_flow_runs_with_states(states)
    print("Done!")
5. Run the cancellation process:
Copy code
python
if __name__ == "__main__":
    anyio.run(bulk_cancel_flow_runs)
This code will cancel all flow runs that are in "Pending", "Running", "Scheduled", or "Late" states when executed. For more details, you can refer to the Prefect documentation: - Prefect 2.x Guide on Using Client: https://docs-2.prefect.io/latest/guides/using-the-client/ - Prefect 3.x Guide on Interacting with API: https://docs-3.prefect.io/3.0/manage/interact-with-api If you're using version 2.x, the process is quite similar; however, ensure you're adapting any version-specific changes if needed.