https://prefect.io logo
Title
d

David Elliott

01/26/2023, 4:26 PM
Quick one RE the cloud API (2.0) - I want to get the count of task runs for a given flow run ID. Am I right to hit the
read_task_runs()
endpoint and count the number of task runs returned? And RE the max limit of 200 task runs per API call, am I right to loop over it in the way that I’m doing in the thread, or is there a neater pattern? Thanks!
from prefect.client.orion import get_client
from uuid import UUID
import asyncio
from typing import List
from prefect.client.schemas import TaskRun
from prefect.orion.schemas.filters import FlowRunFilter

def read_task_runs(flow_run_id: UUID) -> List[TaskRun]:
    batch_size = 200
    offset = 0
    all_task_runs = []
    while True:
        client = get_client()
        runs = asyncio.run(client.read_task_runs(
            flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]}), limit=batch_size, offset=offset
        ))
        all_task_runs += runs
        offset += 200
        if len(runs) < batch_size:
            break
    return all_task_runs

my_flow_run_id = UUID("xxxxxxxxxxx")
total_task_runs = len(read_task_runs(flow_run_id=my_flow_run_id))
Couple of things to note - I had to trial and error my way to finding that the max limit is 200 - anything higher isn’t supported, but can’t see that in the documentation. Also, I was sort of expecting a paginator / next_page response given there’s more to return, but couldn’t see that, hence wanted to check if I’m hitting this endpoint correctly
z

Zanie

01/26/2023, 4:34 PM
You’ll want to context manage the client and probably run the whole thing async i.e.
async def read_task_runs(...):
    async with get_client() as client:
        while ...
            await client.read_task_runs ...

asyncio.run(read_task_runs(...))
If you’re using your own server, it can be changed. It cannot be changed with Cloud.
👍 1
d

David Elliott

01/26/2023, 4:36 PM
Ah perfect, thanks! will switch that to async and context managed client 👌
z

Zanie

01/26/2023, 4:37 PM
This is roughly the pattern I’d use though, I’d set the offset to be the length of the runs that are returned rather than assuming 200
You can probably loop until no runs are returned rather than less then the batch size (then you don’t need to assume a batch size)
That’ll cost one extra API query, but be cleaner. 🤷
d

David Elliott

01/26/2023, 4:38 PM
Oh that’s a great shout, yeah that’s ideal 🙏 thank you!
This is so much neater - thanks again!
async def get_task_runs(flow_run_id: UUID) -> List[TaskRun]:
    task_runs = []
    offset = 0
    async with get_client() as client:
        while True:
            _runs = await client.read_task_runs(
                flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]}), offset=offset
            )
            if not _runs:
                break
            task_runs += _runs
            offset += len(_runs)
    return task_runs

task_runs = asyncio.run(get_task_runs("xxxxxxx))
🙌 1