<@ULVA73B9P> can I use this python function to ret...
# marvin-ai
m
@Marvin can I use this python function to return all runs for a specific set of deployment ids? what else do I need?
Copy code
async def get_flows_between_dates(start_date, end_date, deployment_ids: list):
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                start_time=FlowRunFilterStartTime(
                    after_=start_date,
                    before_=end_date
                )
            ),
            deployment_filter=DeploymentFilter(
                id= DeploymentFilterId(
                    any_ = deployment_ids
                )
            )
        )
        return flow_runs
m
thought for 249.2 seconds
n
this looks conceptually correct
🙏 1
m
Short answer: yes — with a couple small tweaks and some pagination if you truly want “all” runs. Key points: - Use the Prefect 3.x filters from
prefect.client.schemas.filters
. - Filter by deployment IDs using
FlowRunFilterDeploymentId
(recommended), or pass a top-level
deployment_filter
. - Ensure your datetimes are timezone-aware (UTC). - Implement pagination; the API will cap results per request. Here’s a solid, paginated version that filters by actual start time and deployment IDs:
Copy code
from datetime import datetime
from uuid import UUID

from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterStartTime,
    FlowRunFilterDeploymentId,
)

async def get_flows_between_dates(
    start_date: datetime,
    end_date: datetime,
    deployment_ids: list[UUID],
):
    # Tip: make sure start_date/end_date are timezone-aware (UTC)
    async with get_client() as client:
        all_runs = []
        limit = 200
        offset = 0

        base_filter = FlowRunFilter(
            start_time=FlowRunFilterStartTime(after_=start_date, before_=end_date),
            deployment_id=FlowRunFilterDeploymentId.any_(deployment_ids),
        )

        while True:
            batch = await client.read_flow_runs(
                flow_run_filter=base_filter,
                limit=limit,
                offset=offset,
            )
            if not batch:
                break
            all_runs.extend(batch)
            offset += len(batch)

        return all_runs
Notes: - If you prefer a top-level deployment filter, this also works:
Copy code
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterId

batch = await client.read_flow_runs(
    flow_run_filter=FlowRunFilter(
        start_time=FlowRunFilterStartTime(after_=start_date, before_=end_date),
    ),
    deployment_filter=DeploymentFilter(id=DeploymentFilterId(any_=deployment_ids)),
    limit=limit,
    offset=offset,
)
- Authentication: make sure your environment has
PREFECT_API_URL
and (if needed)
PREFECT_API_KEY
set so
get_client()
can connect to your server/cloud. - Time semantics: the snippet above filters by the run’s actual
start_time
. If you instead meant “runs scheduled between dates,” use
FlowRunFilterExpectedStartTime
rather than
FlowRunFilterStartTime
. Docs (Prefect 3.x): - Python API Client - Filter classes reference - Client entrypoints Quick question to confirm: do you want to filter by the runs’ actual start time, or their scheduled/expected start time?