<@ULVA73B9P> Is there any configuration to manage ...
# ask-marvin
n
@Marvin Is there any configuration to manage retention for flow runs?
m
The management of retention for flow runs in Prefect is currently only applicable within Prefect Cloud, not within Prefect Core's open-source platform. In Prefect Cloud, flow runs are retained according to the Flow Run Retention Policy set by your account tier. This policy determines the number of days each flow run is available in the Prefect Cloud UI, and via the Prefect CLI and API after it ends. Once a flow run reaches a terminal state, it will be retained until the end of the flow run retention period specified by your policy. You can learn more about this in the Prefect Cloud documentation.
n
@Marvin Give me an example of how to use read_flow_runs with FlowRunFilterStartTime
m
It seems the provided code example isn't directly related to the usage of
read_flow_runs
with
FlowRunFilterStartTime
. Currently, I don’t have access to examples tailored specifically to this combination, and since Prefect 2.x is new and I have no experience with it, I am unable to provide you with an accurately crafted code snippet. However, in general, to use
read_flow_runs
with filtering by
FlowRunFilterStartTime
, you would typically import the needed Prefect objects and utilize the
read_flow_runs
function with an appropriate filter. Below is a conceptual example of how you might do this:
Copy code
python
from datetime import datetime, timedelta
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter

# Set the start_time to filter flow runs that started after a certain date and time
start_time = datetime.utcnow() - timedelta(days=1)

# Create a FlowRunFilter with start time greater than the given start_time
flow_run_filter = FlowRunFilter(start_time={"ge": start_time})

# Use Prefect client to read flow runs with the filter
async with get_client() as client:
    flow_runs = await client.read_flow_runs(flow_filter=flow_run_filter)

    # Iterate over the flow_runs and process them as needed
    for flow_run in flow_runs:
        print(flow_run)
This example assumes asynchronous context and an Orion-based Prefect setup. Make sure to adapt the example to your specific needs and the version of Prefect you're using. This should give you a starting point for using
read_flow_runs
with a
FlowRunFilterStartTime
. Please consult the Prefect documentation or their community for more detailed examples and guidelines.
n
@Marvin Convert this prefect 1 query to prefect 2 api:
Copy code
client.graphql(
        {
            'query': {
                'flow_run(where: { end_time: {_neq: null}, created: {_lte:' + f'"{created_initial_date}"' + ' }} )': {
                    'id'
                }
            }
        }
    )
m
In Prefect 2.x (Orion), the GraphQL API has been replaced by a more standard REST API, so queries no longer use GraphQL syntax. Instead, you interact with the API using regular function calls. To retrieve flow runs that have ended and were created on or before a specific date (mimicking the GraphQL query you were using with Prefect 1.x), you would use Orion's querying functionality. Here's the equivalent functionality using the Prefect 2.x API:
Copy code
python
from datetime import datetime
from prefect import get_client
from prefect.orion.schemas.filters import FlowRunFilter
from prefect.orion.schemas.sorting import FlowRunSort

# Assuming 'created_initial_date' is a string representing the date you want to filter on
created_initial_date = "2023-01-01T00:00:00Z"

created_initial_date_obj = datetime.fromisoformat(created_initial_date)

# Define filters for the flow runs
flow_run_filter = FlowRunFilter(
    end_time={"neq": None},  # Filter for flow runs that have ended (non-null end_time)
    created={"lte": created_initial_date_obj}  # Filter for flow runs created on or before the specified date
)

# You could also sort the results if needed
sort = FlowRunSort.CREATED.desc()

async with get_client() as client:
    # Use the client to get filtered flow runs
    flow_runs = await client.read_flow_runs(
        flow_run_filter=flow_run_filter,
        sort=sort,  # Optional sorting
        limit=50  # Optional limit to the number of results returned
    )

    # Process the flow runs as needed
    for flow_run in flow_runs:
        print(flow_run.id)
This code will asynchronously fetch the details of the flow runs that match your filter criteria. Make sure you adapt the
created_initial_date
variable to the specific initial date you are interested in and adjust the filter according to your actual timezone handling as
fromisoformat
assumes a naive datetime if no timezone is provided. Please note that this code needs to run in an asynchronous context (e.g., within an
async def
function and an appropriate event loop) due to the use of
async with
and
await
. The Prefect 2.x API is designed to be used asynchronously to improve performance for I/O bound operations. For the exact details and additional examples of how to use the Prefect 2.x API for such queries, consult the Prefect 2.0 documentation.