Luke Dolan
07/10/2023, 3:08 PMimport asyncio
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
from prefect.client.schemas.objects import State, StateType
client = PrefectClient(api=f'<http://xxx.xxx.com:4200/api>')
state_filter = FlowRunFilterState(
type=FlowRunFilterStateType(
any_=[StateType.RUNNING]
)
)
flows_resp = asyncio.get_event_loop().run_until_complete(client.read_flow_runs(flow_run_filter=FlowRunFilter(type=state_filter), limit=1))
<http://logger.info|logger.info>(f'got {len(flows_resp)} flow runs')
for flow in flows_resp:
<http://logger.info|logger.info>(f'flow state details {flow.state}')
this gives the result
16:54:32.780 | Info | httpx - HTTP Request: POST <http://xxx.xxx.com:4200/api/flow_runs/filter> "HTTP/1.1 200 OK"
16:54:32.780032000 [Info ] [httpx] HTTP Request: POST <http://xxx.xxx.com:4200/api/flow_runs/filter> "HTTP/1.1 200 OK"
16:54:32.792552000 [Info ] got 1 flow runs
16:54:32.792795000 [Info ] flow state details Completed()
In the above I would expect that the flow state is RUNNING
rather than COMPLETED
Christopher Boyd
07/10/2023, 5:39 PMStateType.RUNNING
should be like FAILED
, COMPLETED
- see this example I havefrom prefect import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName, FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
import asyncio
async def get_client_response() -> asyncio.coroutine:
async with get_client() as client:
response = await client.read_flow_runs(
deployment_filter=DeploymentFilter(name=DeploymentFilterName
(like_="default")),
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
type=FlowRunFilterStateType(
any_=["FAILED", "COMPLETED"])),
)
)
print(response)
print(len(response))
asyncio.run(get_client_response())
Luke Dolan
07/11/2023, 5:25 PMupdate_filter = FlowRunFilter(
state=FlowRunFilterState( ### <--swapped from `type=`
type=FlowRunFilterStateType(
any_=[StateType.RUNNING]
)
)
)