Kiran
09/30/2025, 11:12 AMMarvin
09/30/2025, 11:13 AMMarvin
09/30/2025, 11:16 AMfrom prefect import flow
from prefect.runtime import flow_run
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import TaskRunFilter, TaskRunFilterFlowRunId
@flow
def my_flow():
current_flow_run_id = flow_run.id
with SyncPrefectClient() as client:
task_runs = client.read_task_runs(
task_run_filter=TaskRunFilter(
flow_run_id=TaskRunFilterFlowRunId(any_=[current_flow_run_id])
),
# Optional:
# limit=200,
# offset=0,
)
# Each item is a TaskRun model with metadata; convert to dicts if you like
for tr in task_runs:
print({
"id": str(tr.id),
"name": tr.name,
"task_name": getattr(tr, "task_name", None),
"state": tr.state.type.value if tr.state else None,
"state_name": tr.state.name if tr.state else None,
"start_time": tr.start_time,
"end_time": tr.end_time,
"run_count": tr.run_count,
"tags": tr.tags,
"parameters": tr.parameters,
})
return task_runs
Async flow:
from prefect import flow
from prefect.runtime import flow_run
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import TaskRunFilter, TaskRunFilterFlowRunId
@flow
async def my_async_flow():
current_flow_run_id = flow_run.id
async with get_client() as client:
task_runs = await client.read_task_runs(
task_run_filter=TaskRunFilter(
flow_run_id=TaskRunFilterFlowRunId(any_=[current_flow_run_id])
)
)
# Example: dump full metadata to dicts
tr_dicts = [tr.model_dump() for tr in task_runs]
return tr_dicts
Notes:
- The TaskRun
objects contain the metadata you’re likely after (ids, names, state, timestamps, parameters, tags, run_count, etc.). Use model_dump()
to convert to plain dicts.
- This will include the task that’s currently running (the one executing this code) if applicable.
- You can add additional filters/sorting/limits if needed.
Relevant docs:
- Filters API (includes TaskRunFilter
and TaskRunFilterFlowRunId
): prefect.client.schemas.filters