<@ULVA73B9P> how to get all the task runs of aflo...
# ask-community
k
@Marvin how to get all the task runs of aflow run along with the metadata from inside that flow run itself
m
thought for 189.3 seconds
Great question — assuming you’re on Prefect 3.x, you can query all task runs for the current flow run from inside the flow using the runtime flow run id + the Prefect client with a TaskRun filter. Here are two minimal examples depending on whether your flow is sync or async. Sync flow (recommended if your flow function is sync):
Copy code
from prefect import flow
from prefect.runtime import flow_run
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import TaskRunFilter, TaskRunFilterFlowRunId

@flow
def my_flow():
    current_flow_run_id = flow_run.id

    with SyncPrefectClient() as client:
        task_runs = client.read_task_runs(
            task_run_filter=TaskRunFilter(
                flow_run_id=TaskRunFilterFlowRunId(any_=[current_flow_run_id])
            ),
            # Optional:
            # limit=200,
            # offset=0,
        )

    # Each item is a TaskRun model with metadata; convert to dicts if you like
    for tr in task_runs:
        print({
            "id": str(tr.id),
            "name": tr.name,
            "task_name": getattr(tr, "task_name", None),
            "state": tr.state.type.value if tr.state else None,
            "state_name": tr.state.name if tr.state else None,
            "start_time": tr.start_time,
            "end_time": tr.end_time,
            "run_count": tr.run_count,
            "tags": tr.tags,
            "parameters": tr.parameters,
        })

    return task_runs
Async flow:
Copy code
from prefect import flow
from prefect.runtime import flow_run
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import TaskRunFilter, TaskRunFilterFlowRunId

@flow
async def my_async_flow():
    current_flow_run_id = flow_run.id

    async with get_client() as client:
        task_runs = await client.read_task_runs(
            task_run_filter=TaskRunFilter(
                flow_run_id=TaskRunFilterFlowRunId(any_=[current_flow_run_id])
            )
        )

    # Example: dump full metadata to dicts
    tr_dicts = [tr.model_dump() for tr in task_runs]
    return tr_dicts
Notes: - The
TaskRun
objects contain the metadata you’re likely after (ids, names, state, timestamps, parameters, tags, run_count, etc.). Use
model_dump()
to convert to plain dicts. - This will include the task that’s currently running (the one executing this code) if applicable. - You can add additional filters/sorting/limits if needed. Relevant docs: - Filters API (includes
TaskRunFilter
and
TaskRunFilterFlowRunId
): prefect.client.schemas.filters