https://prefect.io logo
l

Luke Dolan

07/10/2023, 3:08 PM
Hey thought I'd post here first before creating a bug report When using read_flow_runs, the returned flow runs do not have the state expected by the given filter Server version 2.10.13 Client version 2.10.20 The simple code snippet I have
Copy code
import asyncio
     from prefect.client.orchestration import PrefectClient
     from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
     from prefect.client.schemas.objects import State, StateType
 
    client = PrefectClient(api=f'<http://xxx.xxx.com:4200/api>')
    state_filter = FlowRunFilterState(
               type=FlowRunFilterStateType(
                   any_=[StateType.RUNNING]
               )
           )
    flows_resp = asyncio.get_event_loop().run_until_complete(client.read_flow_runs(flow_run_filter=FlowRunFilter(type=state_filter), limit=1))
     <http://logger.info|logger.info>(f'got {len(flows_resp)} flow runs')
    for flow in flows_resp:
         <http://logger.info|logger.info>(f'flow state details {flow.state}')
this gives the result
Copy code
16:54:32.780 | Info   | httpx - HTTP Request: POST <http://xxx.xxx.com:4200/api/flow_runs/filter> "HTTP/1.1 200 OK"
16:54:32.780032000 [Info ] [httpx] HTTP Request: POST <http://xxx.xxx.com:4200/api/flow_runs/filter> "HTTP/1.1 200 OK"
16:54:32.792552000 [Info ] got 1 flow runs
16:54:32.792795000 [Info ] flow state details Completed()
In the above I would expect that the flow state is
RUNNING
rather than
COMPLETED
c

Christopher Boyd

07/10/2023, 5:39 PM
I think
StateType.RUNNING
should be like
FAILED
,
COMPLETED
- see this example I have
Copy code
from prefect import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName, FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
import asyncio

async def get_client_response() -> asyncio.coroutine:
    async with get_client() as client:
        response = await client.read_flow_runs(
            deployment_filter=DeploymentFilter(name=DeploymentFilterName
            (like_="default")),
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(
                        any_=["FAILED", "COMPLETED"])),
        )
    )
    print(response)
    print(len(response))

asyncio.run(get_client_response())
l

Luke Dolan

07/11/2023, 5:25 PM
Indeed, turns out I had assigned some incorrect input parameters. Thanks!
Copy code
update_filter = FlowRunFilter(
        state=FlowRunFilterState(  ### <--swapped from `type=`
            type=FlowRunFilterStateType(
                any_=[StateType.RUNNING]
            )
        )
    )
1
🙌 1
2 Views