Hi Prefect! How about adding `end_time` to the fil...
# prefect-cloud
a
Hi Prefect! How about adding
end_time
to the filters for flow runs so that we can incrementally pull down the most recently completed flows?
Or is there a way to do this already that I'm not seeing?
n
hi @Adam Roderick - you could use
client.read_flow_runs
like
Copy code
import asyncio

from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort

async def get_most_recent_flow_runs(
    n: int = 3, states: list[str] | None = None
) -> list[FlowRun]:
    if not states:
        states = ["COMPLETED"]
    
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(state={'type': {'any_': states}}),
            sort=FlowRunSort.END_TIME_DESC,
            limit=n,
        )

if __name__ == "__main__":
    last_3_flow_runs: list[FlowRun] = asyncio.run(get_most_recent_flow_runs())
    print(last_3_flow_runs)
    
    assert all(run.state.is_completed() for run in last_3_flow_runs)
    assert (
        end_times := [run.end_time for run in last_3_flow_runs]
    ) == sorted(end_times, reverse=True)
🙌 2