David Elliott
01/26/2023, 4:26 PMread_task_runs()
endpoint and count the number of task runs returned? And RE the max limit of 200 task runs per API call, am I right to loop over it in the way that I’m doing in the thread, or is there a neater pattern? Thanks!from prefect.client.orion import get_client
from uuid import UUID
import asyncio
from typing import List
from prefect.client.schemas import TaskRun
from prefect.orion.schemas.filters import FlowRunFilter
def read_task_runs(flow_run_id: UUID) -> List[TaskRun]:
batch_size = 200
offset = 0
all_task_runs = []
while True:
client = get_client()
runs = asyncio.run(client.read_task_runs(
flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]}), limit=batch_size, offset=offset
))
all_task_runs += runs
offset += 200
if len(runs) < batch_size:
break
return all_task_runs
my_flow_run_id = UUID("xxxxxxxxxxx")
total_task_runs = len(read_task_runs(flow_run_id=my_flow_run_id))
Zanie
01/26/2023, 4:34 PMasync def read_task_runs(...):
async with get_client() as client:
while ...
await client.read_task_runs ...
asyncio.run(read_task_runs(...))
David Elliott
01/26/2023, 4:36 PMZanie
01/26/2023, 4:37 PMDavid Elliott
01/26/2023, 4:38 PMasync def get_task_runs(flow_run_id: UUID) -> List[TaskRun]:
task_runs = []
offset = 0
async with get_client() as client:
while True:
_runs = await client.read_task_runs(
flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]}), offset=offset
)
if not _runs:
break
task_runs += _runs
offset += len(_runs)
return task_runs
task_runs = asyncio.run(get_task_runs("xxxxxxx))