Tom Han
09/15/2025, 5:34 PMMarvin
09/15/2025, 5:35 PMTom Han
09/15/2025, 5:47 PMMarvin
09/15/2025, 5:49 PMprefect flow-run inspect <FLOW_RUN_ID>
This prints the flow run metadata (including parameters, start/end times; total runtime is shown in the UI and available via API — see below). You can pipe to jq
if you want to extract specific fields.
Programmatic export (Python, to CSV)
- This async snippet fetches recent runs, includes parameters, start/end times, total runtime, and the work pool via the deployment. It writes to a CSV.
import asyncio
import csv
from collections import defaultdict
from datetime import timezone
from prefect.client.orchestration import get_client
async def export_flow_runs_to_csv(limit=100, output_path="flow_runs.csv"):
async with get_client() as client:
# Fetch flow runs (adjust filters as needed)
runs = await client.read_flow_runs(limit=limit)
# Cache deployments so we don’t refetch for each run
deployment_cache = {}
with open(output_path, "w", newline="") as f:
fieldnames = [
"id",
"name",
"state",
"start_time",
"end_time",
"total_run_time_seconds",
"parameters",
"deployment_id",
"deployment_name",
"work_pool_name",
"work_queue_name",
]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for r in runs:
# Defensive conversions
start = r.start_time.isoformat() if getattr(r, "start_time", None) else None
end = r.end_time.isoformat() if getattr(r, "end_time", None) else None
total = getattr(r, "total_run_time", None) # seconds (float), if available
dep_id = getattr(r, "deployment_id", None)
dep_name = None
work_pool_name = None
work_queue_name = None
if dep_id:
if dep_id not in deployment_cache:
dep = await client.read_deployment(dep_id)
deployment_cache[dep_id] = dep
dep = deployment_cache[dep_id]
dep_name = dep.name
work_pool_name = getattr(dep, "work_pool_name", None)
work_queue_name = getattr(dep, "work_queue_name", None)
writer.writerow(
{
"id": str(r.id),
"name": r.name,
"state": r.state.name if r.state else None,
"start_time": start,
"end_time": end,
"total_run_time_seconds": total,
"parameters": r.parameters, # will serialize as dict; cast to json if needed
"deployment_id": str(dep_id) if dep_id else None,
"deployment_name": dep_name,
"work_pool_name": work_pool_name,
"work_queue_name": work_queue_name,
}
)
if __name__ == "__main__":
asyncio.run(export_flow_runs_to_csv(limit=200, output_path="flow_runs.csv"))
About the “which worker ran it?”
- Today, Prefect does not persist the specific worker name/ID on the flow run object. You can reliably retrieve the work pool (and often the work queue) via the associated deployment, but not the exact worker process that picked it up.
- If you need the exact worker, common patterns are:Marvin
09/15/2025, 5:49 PMjob_variables
and have your flow log or create an artifact with it.
- Correlate via worker logs (e.g., centralized logging) that show when a worker claimed a run.
Docs and references
- Flow run REST endpoints: Flow run API
- Work pools and workers: Work pools & workers
- Runtime context available inside a running flow (parameters, ids, etc.): Runtime context
If you can share whether you want this for a single run or a bulk/time-range export (and any filters like deployment or tags), I can tailor the snippet to your use case.Tom Han
09/15/2025, 6:47 PMinference/inference
in my deployment and it's runtimeTom Han
09/15/2025, 6:47 PMinference/inference
in my deployment and it's runtimeMarvin
09/15/2025, 6:47 PMMarvin
09/15/2025, 6:50 PMflow_name/deployment_name
, you can filter flow runs to just inference/inference
and compute their runtimes.
Fastest path (Python, Prefect 3.x)
- This snippet filters by deployment name, then computes runtime per run. It prefers total_run_time
if present, and falls back to end−start.
import asyncio
from datetime import datetime, timezone
from statistics import mean
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
async def runtimes_for_deployment(deployment_fqn: str, limit: int = 500):
async with get_client() as client:
runs = await client.read_flow_runs(
limit=limit,
deployments=DeploymentFilter(
name=DeploymentFilterName(any_=[deployment_fqn])
),
)
def runtime_seconds(r):
if getattr(r, "total_run_time", None) is not None:
return r.total_run_time
if r.start_time:
end = r.end_time or datetime.now(timezone.utc)
return (end - r.start_time).total_seconds()
return None
rows = []
for r in runs:
rt = runtime_seconds(r)
rows.append({
"id": str(r.id),
"name": r.name,
"state": r.state.name if r.state else None,
"start_time": r.start_time.isoformat() if r.start_time else None,
"end_time": r.end_time.isoformat() if r.end_time else None,
"runtime_seconds": rt,
})
valid = [row["runtime_seconds"] for row in rows if row["runtime_seconds"] is not None]
summary = {
"count": len(rows),
"with_runtime": len(valid),
"avg_runtime_seconds": mean(valid) if valid else None,
"min_runtime_seconds": min(valid) if valid else None,
"max_runtime_seconds": max(valid) if valid else None,
}
return rows, summary
if __name__ == "__main__":
rows, summary = asyncio.run(runtimes_for_deployment("inference/inference", limit=500))
print("Summary:", summary)
# If you want CSV:
# import csv
# with open("inference_inference_runs.csv", "w", newline="") as f:
# writer = csv.DictWriter(f, fieldnames=rows[0].keys())
# writer.writeheader()
# writer.writerows(rows)
Notes
- The CLI does not currently expose a direct “filter by deployment” option on prefect flow-run ls
. The Python client is the quickest way to do this correctly.
- Filter classes used above: Prefect 3 filters docs
- Flow run API reference: Flow run API
If you want this over a specific time window or only for completed runs, I can add start/end-time and state filters to the snippet.