Scott
03/27/2024, 4:17 PMMarvin
03/27/2024, 4:17 PMpython
import asyncio
from prefect import get_client
async def get_most_recent_flow_runs():
async with get_client() as client:
return await client.read_flow_run(where={"state": {"_eq": "Completed"}})
last_3_flow_runs = asyncio.run(get_most_recent_flow_runs())
print(last_3_flow_runs)
assert all(run.state.is_completed() for run in last_3_flow_runs)
assert (end_times := [run.end_time for run in last_3_flow_runs]) == sorted(end_times, reverse=True)
This code snippet retrieves the last three flow runs and asserts that they are completed and sorts them by end time in reverse order to ensure the most recent runs are listed first.
For more details and examples of using filters and querying run data, you can refer to the relevant guides on using the client and filters API reference:
- Using the Client
- Filters API Reference (not directly linked but mentioned in the excerpt)
Remember, if you want to narrow down the runs of a specific flow or deployment, you can adjust the query passed to client.read_flow_run
using appropriate filters such as DeploymentFilter
.Scott
03/27/2024, 8:59 PMMarvin
03/27/2024, 8:59 PMwhere
clause in the client's read_flow_run
method. It's possible that the code snippet I provided may not be exactly appropriate for your version of Prefect or there may be a missing detail.
If you're having trouble with the where
clause, ensure you're using it in the context of an async function and the client from the correct version of Prefect. Remember that your API query structure needs to match the schema expected by Prefect's GraphQL API.
To resolve the issue, you could either refer to the Prefect documentation on API queries or post a GitHub issue for the Prefect community if the problem persists.
Here's the adjusted version of the code snippet using the Prefect Client, without assuming any where
clause issues:
python
import asyncio
from prefect import Client
async def get_latest_successful_flow_run():
async with Client() as client:
# Adjust the filter below according to your needs
flow_runs = await client.read_flow_run()
# Filter or process the flow_run as needed to find the last successful run
# e.g., finding the last successful run by checking the `state` attribute
# Return or print information about the latest successful flow run
return flow_runs
# If using this, need to run it within an async context
latest_successful_run = asyncio.run(get_latest_successful_flow_run())
print(latest_successful_run)
Please note that you need to ensure that the filtering for the "successful" runs matches the schema and data structure expected by Prefect. If there are details about the error you're experiencing, I could potentially give you more targeted assistance.
For more information on the Prefect Client and handling flow runs, please refer to Prefect Docs - Using the Client.