Tomas Moreno
04/09/2024, 5:52 PMMarvin
04/09/2024, 5:53 PMpython
import asyncio
from prefect.client import get_client
from prefect import FlowRun
async def get_most_recent_successful_flow_run():
async with get_client() as client:
# Get the last 3 flow runs and filter by state to get successful runs
last_flow_runs = await client.read_flow_runs(
limit=3,
state_filter=StateFilter(scheduled_after=..., scheduled_before=..., states=...)
)
# Filter out only successful states
successful_runs = [run for run in last_flow_runs if run.state.is_successful()]
# Assuming runs are returned in reverse chronological order, take the first one
return successful_runs[0] if successful_runs else None
# To execute this asynchronous function outside an async environment:
last_successful_flow_run = asyncio.run(get_most_recent_successful_flow_run())
if last_successful_flow_run:
print(f"The last successful flow run was: {last_successful_flow_run}")
else:
print("No successful runs found.")
In this code sample, you need to adapt the state_filter
arguments within the client.read_flow_runs()
call to match the specific states you consider to be 'successful' (e.g., Completed
). You may also need to adjust the limit
and `scheduled_after`/`scheduled_before` to fetch the appropriate flow runs.
For more details and finer control over filtering, refer to the Prefect documentation on using the client.
Remember, this snippet should be adapted to the specific parameters of your Prefect deployment and the flow runs you're interested in querying.