Hi all, I am trying to read all scheduled runs for...
# ask-community
t
Hi all, I am trying to read all scheduled runs for a flow - doing that using the python SDK and the rest API gives me different responses and I think there is an issue converting the pydantic models to the filter dict (maybe due to my combination of pydantic and prefect versions?
operator
from
OperatorMixin
does not seem to be included). Has anyone spotted this as an issue or does anyone see a mistake I am maybe making? Posting the two approaches in a comment below.
prefect==2.11.0, pydantic==1.10.9
Copy code
from prefect import flow, get_client
from prefect.client.schemas import FlowRun, StateType
from prefect.client.schemas.filters import (
    DeploymentFilter,
    DeploymentFilterName,
    FlowFilter,
    FlowFilterName,
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType
)
import requests
import asyncio

@flow
async def my_flow(flow_name, deployment_name):
    client = get_client()

    flow_runs = []
    offset = 0
    fetching_flow_runs = True
    while fetching_flow_runs:
        scheduled_flows_chunk = await client.read_flow_runs(
            flow_filter=FlowFilter(**{"name": FlowFilterName(**{"any_": [flow_name]})}),
            deployment_filter=DeploymentFilter(
                **{"name": DeploymentFilterName(**{"_any": [deployment_name]})}
            ),
            flow_run_filter=FlowRunFilter(
                **{
                    "state": FlowRunFilterState(
                        **{
                            "type": FlowRunFilterStateType(
                                **{"_any": [StateType.SCHEDULED]}
                            )
                        }
                    )
                }
            ),
            offset=offset,
        )
        if len(scheduled_flows_chunk) > 0:
            flow_runs.extend(scheduled_flows_chunk)
            offset += 200
        else:
            fetching_flow_runs = False
    return  flow_runs


@flow
def my_flow_alt(flow_name, deployment_name):
    client = get_client()
    api = client.api_url

    flow_runs = []
    offset = 0
    fetching_flow_runs = True

    while fetching_flow_runs:
        data = {
            "flows": {
                "operator": "and_",
                "name": {
                    "any_": [
                        flow_name
                    ]
                }},
            "deployment": {
                "operator": "and_",
                "name": {"any_": [deployment_name]}},
            "flow_runs": {
                "state": {
                    "operator": "and_",
                    "type": {
                        "any_": [
                            "SCHEDULED"
                        ]
                    },
                },
            },
            "offset": offset,
        }

        response = <http://requests.post|requests.post>(
            url=f"{api}flow_runs/filter",
            json=data,
            headers={"Content-Type": "application/json"},
        )
        response.raise_for_status()
        if response.json():
            flow_runs.extend([FlowRun(**result) for result in response.json()])
            offset += 200
        else:
            fetching_flow_runs = False
    return flow_runs