Riya Sinha
09/29/2025, 7:43 PMMarvin
09/29/2025, 7:43 PMMarvin
09/29/2025, 7:47 PMPrefectClient.read_flow_run_graph(flow_run_id, since=...)
- REST API: the “read flow run graph” endpoint documented here: Read Flow Run Graph (v2)
The response gives you:
- Nodes: one per task run (and subflow runs), including metadata like label, state_type, start/end times
- Edges: parent/child relationships you can use to render a DAG
Example (async) using the Python client
import asyncio
from datetime import datetime, timezone
from prefect.client.orchestration import PrefectClient
# Ensure your environment points at your private Prefect API
# export PREFECT_API_URL="<https://your-private-prefect/api>"
async def fetch_flow_run_graph(flow_run_id: str, since: datetime | None = None):
async with PrefectClient() as client:
graph = await client.read_flow_run_graph(flow_run_id, since=since)
# Convert to a simple structure for your UI
nodes = []
edges = []
# graph.nodes is typically a mapping of node_id -> node
for node_id, node in graph.nodes.items():
nodes.append({
"id": str(node_id),
"label": getattr(node, "label", None),
"kind": getattr(node, "kind", None), # e.g., "task-run" or "flow-run"
"state_type": getattr(node, "state_type", None), # e.g., "COMPLETED", "RUNNING"
"start_time": getattr(node, "start_time", None),
"end_time": getattr(node, "end_time", None),
})
# Children typically references downstream node ids
for child_id in getattr(node, "children", []):
edges.append({"source": str(node_id), "target": str(child_id)})
return {"nodes": nodes, "edges": edges}
async def main():
flow_run_id = "YOUR_FLOW_RUN_ID"
# Optional: incremental updates since a timestamp
since = datetime.now(tz=timezone.utc) # or None for full graph
dag = await fetch_flow_run_graph(flow_run_id, since=None)
print(dag)
asyncio.run(main())
Notes and tips
- Real-time/streaming: the since
parameter lets you poll for incremental updates and update your UI without re-downloading the entire graph.
- Multi-tenant isolation: since your Prefect instance is private per client, you can expose only the flow runs that belong to that client in your app. If you do need to list a client’s flow runs first, the client also supports filtered listing (e.g., by tags, deployment, work queue) before you fetch each run’s graph.
- Authentication:
- Self-hosted Prefect Server: set PREFECT_API_URL
to your instance URL.
- Prefect Cloud: also set PREFECT_API_KEY
. (Not needed if fully private Server.)
Docs
- Python client and flow run APIs: Python API reference – flow runs
- REST endpoint: Read Flow Run Graph (v2)
If you share whether you’re using Server or Cloud and how you identify “a client’s runs” (tags, work pools, separate workspaces), I can sketch the listing + graph-fetch pattern tailored to your setup.