Netanel Malka
03/20/2024, 2:28 PMMarvin
03/20/2024, 2:29 PMNetanel Malka
03/20/2024, 2:40 PMMarvin
03/20/2024, 2:40 PMread_flow_runs
with FlowRunFilterStartTime
. Currently, I don’t have access to examples tailored specifically to this combination, and since Prefect 2.x is new and I have no experience with it, I am unable to provide you with an accurately crafted code snippet.
However, in general, to use read_flow_runs
with filtering by FlowRunFilterStartTime
, you would typically import the needed Prefect objects and utilize the read_flow_runs
function with an appropriate filter. Below is a conceptual example of how you might do this:
python
from datetime import datetime, timedelta
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter
# Set the start_time to filter flow runs that started after a certain date and time
start_time = datetime.utcnow() - timedelta(days=1)
# Create a FlowRunFilter with start time greater than the given start_time
flow_run_filter = FlowRunFilter(start_time={"ge": start_time})
# Use Prefect client to read flow runs with the filter
async with get_client() as client:
flow_runs = await client.read_flow_runs(flow_filter=flow_run_filter)
# Iterate over the flow_runs and process them as needed
for flow_run in flow_runs:
print(flow_run)
This example assumes asynchronous context and an Orion-based Prefect setup. Make sure to adapt the example to your specific needs and the version of Prefect you're using. This should give you a starting point for using read_flow_runs
with a FlowRunFilterStartTime
. Please consult the Prefect documentation or their community for more detailed examples and guidelines.Netanel Malka
03/20/2024, 2:43 PMclient.graphql(
{
'query': {
'flow_run(where: { end_time: {_neq: null}, created: {_lte:' + f'"{created_initial_date}"' + ' }} )': {
'id'
}
}
}
)
Marvin
03/20/2024, 2:43 PMpython
from datetime import datetime
from prefect import get_client
from prefect.orion.schemas.filters import FlowRunFilter
from prefect.orion.schemas.sorting import FlowRunSort
# Assuming 'created_initial_date' is a string representing the date you want to filter on
created_initial_date = "2023-01-01T00:00:00Z"
created_initial_date_obj = datetime.fromisoformat(created_initial_date)
# Define filters for the flow runs
flow_run_filter = FlowRunFilter(
end_time={"neq": None}, # Filter for flow runs that have ended (non-null end_time)
created={"lte": created_initial_date_obj} # Filter for flow runs created on or before the specified date
)
# You could also sort the results if needed
sort = FlowRunSort.CREATED.desc()
async with get_client() as client:
# Use the client to get filtered flow runs
flow_runs = await client.read_flow_runs(
flow_run_filter=flow_run_filter,
sort=sort, # Optional sorting
limit=50 # Optional limit to the number of results returned
)
# Process the flow runs as needed
for flow_run in flow_runs:
print(flow_run.id)
This code will asynchronously fetch the details of the flow runs that match your filter criteria. Make sure you adapt the created_initial_date
variable to the specific initial date you are interested in and adjust the filter according to your actual timezone handling as fromisoformat
assumes a naive datetime if no timezone is provided.
Please note that this code needs to run in an asynchronous context (e.g., within an async def
function and an appropriate event loop) due to the use of async with
and await
. The Prefect 2.x API is designed to be used asynchronously to improve performance for I/O bound operations.
For the exact details and additional examples of how to use the Prefect 2.x API for such queries, consult the Prefect 2.0 documentation.