Tom Han
09/15/2025, 5:34 PMMarvin
09/15/2025, 5:35 PMTom Han
09/15/2025, 5:47 PMMarvin
09/15/2025, 5:49 PMprefect flow-run inspect <FLOW_RUN_ID>jqimport asyncio
  import csv
  from collections import defaultdict
  from datetime import timezone
  from prefect.client.orchestration import get_client
  async def export_flow_runs_to_csv(limit=100, output_path="flow_runs.csv"):
      async with get_client() as client:
          # Fetch flow runs (adjust filters as needed)
          runs = await client.read_flow_runs(limit=limit)
          # Cache deployments so we don’t refetch for each run
          deployment_cache = {}
          with open(output_path, "w", newline="") as f:
              fieldnames = [
                  "id",
                  "name",
                  "state",
                  "start_time",
                  "end_time",
                  "total_run_time_seconds",
                  "parameters",
                  "deployment_id",
                  "deployment_name",
                  "work_pool_name",
                  "work_queue_name",
              ]
              writer = csv.DictWriter(f, fieldnames=fieldnames)
              writer.writeheader()
              for r in runs:
                  # Defensive conversions
                  start = r.start_time.isoformat() if getattr(r, "start_time", None) else None
                  end = r.end_time.isoformat() if getattr(r, "end_time", None) else None
                  total = getattr(r, "total_run_time", None)  # seconds (float), if available
                  dep_id = getattr(r, "deployment_id", None)
                  dep_name = None
                  work_pool_name = None
                  work_queue_name = None
                  if dep_id:
                      if dep_id not in deployment_cache:
                          dep = await client.read_deployment(dep_id)
                          deployment_cache[dep_id] = dep
                      dep = deployment_cache[dep_id]
                      dep_name = dep.name
                      work_pool_name = getattr(dep, "work_pool_name", None)
                      work_queue_name = getattr(dep, "work_queue_name", None)
                  writer.writerow(
                      {
                          "id": str(r.id),
                          "name": r.name,
                          "state": r.state.name if r.state else None,
                          "start_time": start,
                          "end_time": end,
                          "total_run_time_seconds": total,
                          "parameters": r.parameters,  # will serialize as dict; cast to json if needed
                          "deployment_id": str(dep_id) if dep_id else None,
                          "deployment_name": dep_name,
                          "work_pool_name": work_pool_name,
                          "work_queue_name": work_queue_name,
                      }
                  )
  if __name__ == "__main__":
      asyncio.run(export_flow_runs_to_csv(limit=200, output_path="flow_runs.csv"))Marvin
09/15/2025, 5:49 PMjob_variablesTom Han
09/15/2025, 6:47 PMinference/inferenceTom Han
09/15/2025, 6:47 PMinference/inferenceMarvin
09/15/2025, 6:47 PMMarvin
09/15/2025, 6:50 PMflow_name/deployment_nameinference/inferencetotal_run_timeimport asyncio
from datetime import datetime, timezone
from statistics import mean
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
async def runtimes_for_deployment(deployment_fqn: str, limit: int = 500):
    async with get_client() as client:
        runs = await client.read_flow_runs(
            limit=limit,
            deployments=DeploymentFilter(
                name=DeploymentFilterName(any_=[deployment_fqn])
            ),
        )
        def runtime_seconds(r):
            if getattr(r, "total_run_time", None) is not None:
                return r.total_run_time
            if r.start_time:
                end = r.end_time or datetime.now(timezone.utc)
                return (end - r.start_time).total_seconds()
            return None
        rows = []
        for r in runs:
            rt = runtime_seconds(r)
            rows.append({
                "id": str(r.id),
                "name": r.name,
                "state": r.state.name if r.state else None,
                "start_time": r.start_time.isoformat() if r.start_time else None,
                "end_time": r.end_time.isoformat() if r.end_time else None,
                "runtime_seconds": rt,
            })
        valid = [row["runtime_seconds"] for row in rows if row["runtime_seconds"] is not None]
        summary = {
            "count": len(rows),
            "with_runtime": len(valid),
            "avg_runtime_seconds": mean(valid) if valid else None,
            "min_runtime_seconds": min(valid) if valid else None,
            "max_runtime_seconds": max(valid) if valid else None,
        }
        return rows, summary
if __name__ == "__main__":
    rows, summary = asyncio.run(runtimes_for_deployment("inference/inference", limit=500))
    print("Summary:", summary)
    # If you want CSV:
    # import csv
    # with open("inference_inference_runs.csv", "w", newline="") as f:
    #     writer = csv.DictWriter(f, fieldnames=rows[0].keys())
    #     writer.writeheader()
    #     writer.writerows(rows)prefect flow-run ls