https://prefect.io logo
g

Gregory Hunt

07/21/2023, 6:56 PM
Can you can the api and return all flow_runs with a specific Parameter?
1
@Nate
n

Nate

07/21/2023, 7:04 PM
you can check for whatever param values you want, and get more specific about where you look with the
flow_filter
and/or
flow_run_filter
Copy code
In [1]: from prefect import get_client

In [2]: async with get_client() as client:
   ...:     runs = [run for run in await client.read_flow_runs() if run.parameters != {}]
   ...:

In [3]: runs
g

Gregory Hunt

07/21/2023, 7:04 PM
Awesome, Thanks!
n

Nate

07/21/2023, 7:05 PM
👍
g

Gregory Hunt

07/21/2023, 7:07 PM
@artur
@parker
@Nate I have a bunch of pending jobs clogging my queue where some worker failed. How do i write a script similar to the one above to cancel all task flows in a pending state
I tried this
Copy code
async with get_client() as client:
        runs = [run for run in await client.read_flow_runs() if run.state.is_pending()]
        print(runs)
but get and empty response
n

Nate

08/03/2023, 9:39 PM
hmm - just a small sanity check, are you running the script from the same workspace as your pending runs? your client use looks fine
g

Gregory Hunt

08/03/2023, 9:40 PM
I am
n

Nate

08/03/2023, 9:41 PM
hmm could you grab an ID and/or filter on name of that seems pending and read that flow run to see what the API thinks it is?
like it might be LATE or something is what i am thinking
g

Gregory Hunt

08/03/2023, 9:42 PM
Ok I will try that
@Nate it is pending, but it looks like the issue is that
await client.read_flow_runs()
only pulls back 200 flows. DO you have an example of how to pass a filter for pending ?
n

Nate

08/04/2023, 1:04 PM
yep one sec
Copy code
In [15]: from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType

In [16]: async with get_client() as client:
    ...:     runs = await client.read_flow_runs(
    ...:         flow_run_filter=FlowRunFilter(state=FlowRunFilterState(type=FlowRunFilterStateType(any_=["PENDING"])))
    ...:     )
    ...:
g

Gregory Hunt

08/04/2023, 1:23 PM
Awesome
g

Giacomo Chiarella

09/11/2023, 9:43 AM
Is it possible to remove the 200 limitation by just filtering by date range (_after _ before)?
g

Gregory Hunt

09/12/2023, 1:03 PM
I haven't found one, but you can use the offset field to write a loop until you run out of responses
g

Giacomo Chiarella

09/12/2023, 1:07 PM
yes, that is what I’ve done although it is a bit verbose code-wise but it works