<@ULVA73B9P> Is there a quick and easy way to expo...
# ask-marvin
t
@Marvin Is there a quick and easy way to export from metadata from a flowrun? I am interested in the total run time, parameters passed in, and the worker from the worker-pool that is running the job
m
thought for 888.0 seconds
t
Marvin you good?
m
Great question — yes, you can pull those fields quickly. A few options below depending on whether you want a one-off lookup or a repeatable export. Quick, one-off (CLI) - Inspect a single run:
Copy code
prefect flow-run inspect <FLOW_RUN_ID>
This prints the flow run metadata (including parameters, start/end times; total runtime is shown in the UI and available via API — see below). You can pipe to
jq
if you want to extract specific fields. Programmatic export (Python, to CSV) - This async snippet fetches recent runs, includes parameters, start/end times, total runtime, and the work pool via the deployment. It writes to a CSV.
Copy code
import asyncio
  import csv
  from collections import defaultdict
  from datetime import timezone
  from prefect.client.orchestration import get_client

  async def export_flow_runs_to_csv(limit=100, output_path="flow_runs.csv"):
      async with get_client() as client:
          # Fetch flow runs (adjust filters as needed)
          runs = await client.read_flow_runs(limit=limit)

          # Cache deployments so we don’t refetch for each run
          deployment_cache = {}

          with open(output_path, "w", newline="") as f:
              fieldnames = [
                  "id",
                  "name",
                  "state",
                  "start_time",
                  "end_time",
                  "total_run_time_seconds",
                  "parameters",
                  "deployment_id",
                  "deployment_name",
                  "work_pool_name",
                  "work_queue_name",
              ]
              writer = csv.DictWriter(f, fieldnames=fieldnames)
              writer.writeheader()

              for r in runs:
                  # Defensive conversions
                  start = r.start_time.isoformat() if getattr(r, "start_time", None) else None
                  end = r.end_time.isoformat() if getattr(r, "end_time", None) else None
                  total = getattr(r, "total_run_time", None)  # seconds (float), if available

                  dep_id = getattr(r, "deployment_id", None)
                  dep_name = None
                  work_pool_name = None
                  work_queue_name = None

                  if dep_id:
                      if dep_id not in deployment_cache:
                          dep = await client.read_deployment(dep_id)
                          deployment_cache[dep_id] = dep
                      dep = deployment_cache[dep_id]
                      dep_name = dep.name
                      work_pool_name = getattr(dep, "work_pool_name", None)
                      work_queue_name = getattr(dep, "work_queue_name", None)

                  writer.writerow(
                      {
                          "id": str(r.id),
                          "name": r.name,
                          "state": r.state.name if r.state else None,
                          "start_time": start,
                          "end_time": end,
                          "total_run_time_seconds": total,
                          "parameters": r.parameters,  # will serialize as dict; cast to json if needed
                          "deployment_id": str(dep_id) if dep_id else None,
                          "deployment_name": dep_name,
                          "work_pool_name": work_pool_name,
                          "work_queue_name": work_queue_name,
                      }
                  )

  if __name__ == "__main__":
      asyncio.run(export_flow_runs_to_csv(limit=200, output_path="flow_runs.csv"))
About the “which worker ran it?” - Today, Prefect does not persist the specific worker name/ID on the flow run object. You can reliably retrieve the work pool (and often the work queue) via the associated deployment, but not the exact worker process that picked it up. - If you need the exact worker, common patterns are:
- Have each worker inject its name as an environment variable in
job_variables
and have your flow log or create an artifact with it. - Correlate via worker logs (e.g., centralized logging) that show when a worker claimed a run. Docs and references - Flow run REST endpoints: Flow run API - Work pools and workers: Work pools &amp; workers - Runtime context available inside a running flow (parameters, ids, etc.): Runtime context If you can share whether you want this for a single run or a bulk/time-range export (and any filters like deployment or tags), I can tailor the snippet to your use case.
t
Is it possible to filter based on deployent type? I am only interested in
inference/inference
in my deployment and it's runtime
@Marvin Is it possible to filter based on deployent type? I am only interested in
inference/inference
in my deployment and it's runtime
m
thought for 165.9 seconds
Yes—if by “deployment type” you mean the fully-qualified deployment name
flow_name/deployment_name
, you can filter flow runs to just
inference/inference
and compute their runtimes. Fastest path (Python, Prefect 3.x) - This snippet filters by deployment name, then computes runtime per run. It prefers
total_run_time
if present, and falls back to end−start.
Copy code
import asyncio
from datetime import datetime, timezone
from statistics import mean
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName

async def runtimes_for_deployment(deployment_fqn: str, limit: int = 500):
    async with get_client() as client:
        runs = await client.read_flow_runs(
            limit=limit,
            deployments=DeploymentFilter(
                name=DeploymentFilterName(any_=[deployment_fqn])
            ),
        )

        def runtime_seconds(r):
            if getattr(r, "total_run_time", None) is not None:
                return r.total_run_time
            if r.start_time:
                end = r.end_time or datetime.now(timezone.utc)
                return (end - r.start_time).total_seconds()
            return None

        rows = []
        for r in runs:
            rt = runtime_seconds(r)
            rows.append({
                "id": str(r.id),
                "name": r.name,
                "state": r.state.name if r.state else None,
                "start_time": r.start_time.isoformat() if r.start_time else None,
                "end_time": r.end_time.isoformat() if r.end_time else None,
                "runtime_seconds": rt,
            })

        valid = [row["runtime_seconds"] for row in rows if row["runtime_seconds"] is not None]
        summary = {
            "count": len(rows),
            "with_runtime": len(valid),
            "avg_runtime_seconds": mean(valid) if valid else None,
            "min_runtime_seconds": min(valid) if valid else None,
            "max_runtime_seconds": max(valid) if valid else None,
        }
        return rows, summary

if __name__ == "__main__":
    rows, summary = asyncio.run(runtimes_for_deployment("inference/inference", limit=500))
    print("Summary:", summary)
    # If you want CSV:
    # import csv
    # with open("inference_inference_runs.csv", "w", newline="") as f:
    #     writer = csv.DictWriter(f, fieldnames=rows[0].keys())
    #     writer.writeheader()
    #     writer.writerows(rows)
Notes - The CLI does not currently expose a direct “filter by deployment” option on
prefect flow-run ls
. The Python client is the quickest way to do this correctly. - Filter classes used above: Prefect 3 filters docs - Flow run API reference: Flow run API If you want this over a specific time window or only for completed runs, I can add start/end-time and state filters to the snippet.