Tatjana Puskarov
01/24/2024, 11:22 AMoperator
from OperatorMixin
does not seem to be included). Has anyone spotted this as an issue or does anyone see a mistake I am maybe making? Posting the two approaches in a comment below. prefect==2.11.0, pydantic==1.10.9
Tatjana Puskarov
01/24/2024, 11:23 AMfrom prefect import flow, get_client
from prefect.client.schemas import FlowRun, StateType
from prefect.client.schemas.filters import (
DeploymentFilter,
DeploymentFilterName,
FlowFilter,
FlowFilterName,
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateType
)
import requests
import asyncio
@flow
async def my_flow(flow_name, deployment_name):
client = get_client()
flow_runs = []
offset = 0
fetching_flow_runs = True
while fetching_flow_runs:
scheduled_flows_chunk = await client.read_flow_runs(
flow_filter=FlowFilter(**{"name": FlowFilterName(**{"any_": [flow_name]})}),
deployment_filter=DeploymentFilter(
**{"name": DeploymentFilterName(**{"_any": [deployment_name]})}
),
flow_run_filter=FlowRunFilter(
**{
"state": FlowRunFilterState(
**{
"type": FlowRunFilterStateType(
**{"_any": [StateType.SCHEDULED]}
)
}
)
}
),
offset=offset,
)
if len(scheduled_flows_chunk) > 0:
flow_runs.extend(scheduled_flows_chunk)
offset += 200
else:
fetching_flow_runs = False
return flow_runs
@flow
def my_flow_alt(flow_name, deployment_name):
client = get_client()
api = client.api_url
flow_runs = []
offset = 0
fetching_flow_runs = True
while fetching_flow_runs:
data = {
"flows": {
"operator": "and_",
"name": {
"any_": [
flow_name
]
}},
"deployment": {
"operator": "and_",
"name": {"any_": [deployment_name]}},
"flow_runs": {
"state": {
"operator": "and_",
"type": {
"any_": [
"SCHEDULED"
]
},
},
},
"offset": offset,
}
response = <http://requests.post|requests.post>(
url=f"{api}flow_runs/filter",
json=data,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
if response.json():
flow_runs.extend([FlowRun(**result) for result in response.json()])
offset += 200
else:
fetching_flow_runs = False
return flow_runs