Adam Roderick
10/09/2023, 11:29 PMend_time
to the filters for flow runs so that we can incrementally pull down the most recently completed flows?Adam Roderick
10/09/2023, 11:30 PMBen Muller
10/10/2023, 2:06 AMNate
10/10/2023, 3:23 AMclient.read_flow_runs
like
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort
async def get_most_recent_flow_runs(
n: int = 3, states: list[str] | None = None
) -> list[FlowRun]:
if not states:
states = ["COMPLETED"]
async with get_client() as client:
return await client.read_flow_runs(
flow_run_filter=FlowRunFilter(state={'type': {'any_': states}}),
sort=FlowRunSort.END_TIME_DESC,
limit=n,
)
if __name__ == "__main__":
last_3_flow_runs: list[FlowRun] = asyncio.run(get_most_recent_flow_runs())
print(last_3_flow_runs)
assert all(run.state.is_completed() for run in last_3_flow_runs)
assert (
end_times := [run.end_time for run in last_3_flow_runs]
) == sorted(end_times, reverse=True)