<@ULVA73B9P> (using prefect 3) could you give me a...
# ask-marvin
p
@Marvin (using prefect 3) could you give me a script that pulls all flow runs in the state cancelling and deletes them?
m
It seems my attempt to check GitHub issues hit a little snag, likely because of a permissions hitch. But fear not! I did find useful information in the Prefect 3.x documentation regarding managing and cancelling flow runs. To delete flow runs in the "Cancelling" state, you'd typically use Prefect's REST API or Python client. Here's a simple Python script illustrating how you might achieve this with the Prefect client:
Copy code
python
from prefect import Client
from prefect.server.schemas.states import StateType

# Initialize the Prefect client
client = Client()

# Query all flow runs
flow_runs = client.graphql(
    """
    query {
        flow_run(where: { state: { _eq: "Cancelling" } }) {
            id
        }
    }
    """
)

# Iterate over the flow runs and delete them
for flow_run in flow_runs.data.flow_run:
    flow_run_id = flow_run.id
    client.delete_flow_run(flow_run_id)
    print(f"Deleted flow run with ID: {flow_run_id}")
Please refer to the Prefect documentation for more details on managing states and cancelling flow runs: - Managing States in Prefect - Cancelling Flow Runs If you run into any doldrums with permissions or further intricacies, feel free to reach out!
p
@Marvin can I get a solution that uses the API not the graphql?
m
I didn't find a specific example for deleting flow runs using the API, but you can achieve this with the Prefect REST API. Here's a Python script using the requests library to delete flow runs in the "Cancelling" state:
Copy code
python
import requests

# Your Prefect Cloud or server URL
BASE_URL = "<https://api.prefect.io>"

# Your Prefect API token
AUTH_TOKEN = "YOUR_PREFECT_API_TOKEN"

# Headers for the API requests
headers = {
    "Authorization": f"Bearer {AUTH_TOKEN}",
    "Content-Type": "application/json",
}

# Fetch flow runs in the "Cancelling" state
response = requests.get(f"{BASE_URL}/flow_runs", headers=headers, params={"state": "CANCELLING"})
flow_runs = response.json()

# Iterate over the flow runs and delete them
for flow_run in flow_runs:
    flow_run_id = flow_run["id"]
    delete_response = requests.delete(f"{BASE_URL}/flow_runs/{flow_run_id}", headers=headers)
    if delete_response.status_code == 204:
        print(f"Deleted flow run with ID: {flow_run_id}")
    else:
        print(f"Failed to delete flow run with ID: {flow_run_id}, Status: {delete_response.status_code}")
You'll need to replace
"YOUR_PREFECT_API_TOKEN"
with your actual Prefect API token and ensure that
BASE_URL
points to your Prefect instance. Keep in mind the API might have different endpoints depending on how your Prefect server is configured, so adjust the base URL accordingly. For more information on using the REST API, check the Prefect API documentation. If the run is in a cancelling state, it's not a full cancellation yet, it's in the process of stopping, so make sure it fits your need to handle these runs.
p
@Marvin no, I mean using
FlowRunFilter
,
read_flow_runs
,
delete_flow_run
, etc.
m
Using
FlowRunFilter
,
read_flow_runs
, and
delete_flow_run
to manage and delete flow runs in Prefect 3.x is a more programmatic approach. Below is a sample script to achieve this using Prefect's Python client:
Copy code
python
from prefect.client import get_client
from prefect.server.schemas.filters import FlowRunFilter
from prefect.server.schemas.states import StateType

async def delete_cancelling_flow_runs():
    async with get_client() as client:
        # Define the filter to fetch flow runs in "Cancelling" state
        flow_run_filter = FlowRunFilter(state={"type": StateType.CANCELLING})

        # Retrieve all flow runs in the "Cancelling" state
        flow_runs = await client.read_flow_runs(flow_run_filter)

        for flow_run in flow_runs:
            # Delete each flow run
            await client.delete_flow_run(flow_run.id)
            print(f"Deleted flow run with ID: {flow_run.id}")

# Trigger the delete operation
import asyncio
asyncio.run(delete_cancelling_flow_runs())
Make sure to replace any necessary configurations, such as the connection settings to your Prefect API if needed. This script uses an asynchronous approach which is typical in Prefect's I/O operations. For more detailed information, you might want to check Prefect's development or API documentation here. If you have the right level of pessimistic charm, you might remember to double-check that each cancelling flow run you didn't want is actually gone!
p
@Marvin why am i getting
Copy code
pydantic.errors.PydanticUserError: `TypeAdapter[typing.List[prefect.client.schemas.objects.FlowRun]]` is not fully defined; you should define `typing.List[prefect.client.schemas.objects.FlowRun]` and all referenced types, then call `.rebuild()` on the instance.
?
m
The error you're encountering,
PydanticUserError
, indicates that there's an issue with how the type is defined with Pydantic's
TypeAdapter
. Prefect 3.x utilizes Pydantic 2.0, which has some changes from previous versions. Here's what you can do to address this issue: 1. Ensure Compatible Pydantic Types: Prefect 3.0 uses Pydantic 2.0, introducing some breaking changes. If you've defined custom Pydantic models, ensure they are fully defined and compatible. Refer to Pydantic’s migration guide. 2. Rebuild Type Definitions: When you encounter a
TypeAdapter
issue, it's often because the type hasn't been fully defined or built. Invoke
.rebuild()
on the type instance if that's part of how the objects are used (this is more of a conceptual reminder, as usage specifics might vary). 3. Update Your Environment: Ensure your Prefect and Pydantic libraries are fully up to date, which might involve upgrading or even downgrading if versions are mismatched in your environment. Here's a basic attempt to handle this with Prefect:
Copy code
python
from pydantic import BaseModel, create_model, validate_model

# Define your FlowRun or other models properly in conjunction with Prefect's updates
class FlowRun(BaseModel):
    # Define your fields here
    pass

# Use this if applicable to ensure the definition is complete
FlowRun = create_model('FlowRun').rebuild()  # Example of attempting to adjust or rebuild the type

# Proceed with Prefect's operations ensuring the models are correct
If you do not have custom models that need adjusting, ensure you are using Prefect's provided classes and functions correctly as updated for their current version. For more tailored fixes, I recommend jumping into the Prefect forums or looking for recent detailed guides on Prefect with Pydantic 2.0 as it's a nuanced upgrade path with some gotchas. For general Prefect's upgrade notes, you might want to read more here.
p
@Marvin i'm using prefect 3.1.5 for both, client and server. What's the recommended way to use python to get the flow runs the?
m
With Prefect 3.1.5, you can retrieve flow runs using the
PrefectClient
class, which allows you to interact with Prefect's API in Python. Here's a basic guide on how to do this: 1. Set up your Prefect environment: Make sure your Prefect client is configured properly to communicate with your Prefect server or cloud environment. 2. Use the PrefectClient to read flow runs: Here's a sample script demonstrating how to fetch flow runs using the `PrefectClient`:
Copy code
python
import asyncio
from prefect.client import get_client
from prefect.server.schemas.filters import FlowRunFilter
from prefect.server.schemas.states import StateType

async def fetch_flow_runs():
    # Initialize the Prefect client
    async with get_client() as client:
        # You can define filters as needed, here for illustration with no filters
        flow_run_filter = FlowRunFilter()
        
        # Use read_flow_runs method to retrieve the flow runs
        flow_runs = await client.read_flow_runs(flow_run_filter)
        
        # Process the retrieved flow runs
        for flow_run in flow_runs:
            # Example: Print each flow run's ID
            print(f"Found flow run with ID: {flow_run.id}")

# Execute the async function
asyncio.run(fetch_flow_runs())
### Key Points: -
PrefectClient
is an async context manager:
It needs to be used within an
async
function and properly awaited. - Flexible filtering with `FlowRunFilter`: Customize the filter to retrieve flow runs in specific states or fulfilling various criteria. - Asynchronous execution: Ensure your environment supports asyncio for handling the asynchronous nature of
PrefectClient
. ### Additional Resources: You can dive deeper into different options and interact with more API endpoints by checking the Prefect API documentation. These improvements in Prefect 3.1.5 should make it feel like you're less in a giant bureaucracy trying to get flow runs and more in a well-oiled machine!
👎 1