Ravish Mallya
09/23/2025, 9:04 PMMarvin
09/23/2025, 9:04 PMMarvin
09/23/2025, 9:09 PM# Example FastAPI proxy (minimal)
import os
import httpx
from fastapi import FastAPI, HTTPException
PREFECT_API_URL = os.environ.get("PREFECT_API_URL", "<http://127.0.0.1:4200/api>") # Server default
PREFECT_API_KEY = os.environ.get("PREFECT_API_KEY") # Cloud only
def headers():
h = {"Content-Type": "application/json"}
if PREFECT_API_KEY:
h["Authorization"] = f"Bearer {PREFECT_API_KEY}"
return h
app = FastAPI()
@app.get("/flow-runs/{flow_run_id}/graph")
async def get_flow_run_graph(flow_run_id: str):
# Prefer the v2 graph
url = f"{PREFECT_API_URL}/flow_runs/graph/{flow_run_id}/v2"
async with httpx.AsyncClient() as client:
resp = await <http://client.post|client.post>(url, headers=headers(), json={})
if resp.status_code != 200:
raise HTTPException(resp.status_code, resp.text)
return resp.json()
@app.get("/flow-runs/{flow_run_id}/task-runs")
async def get_task_runs(flow_run_id: str):
# Filter task runs by flow_run_id
url = f"{PREFECT_API_URL}/task_runs/filter"
payload = {
"task_runs": {
"flow_run_id": {"any_": [flow_run_id]}
}
}
async with httpx.AsyncClient() as client:
resp = await <http://client.post|client.post>(url, headers=headers(), json=payload)
if resp.status_code != 200:
raise HTTPException(resp.status_code, resp.text)
return resp.json()
@app.get("/flow-runs/{flow_run_id}")
async def get_flow_run(flow_run_id: str):
url = f"{PREFECT_API_URL}/flow_runs/{flow_run_id}"
async with httpx.AsyncClient() as client:
resp = await client.get(url, headers=headers())
if resp.status_code != 200:
raise HTTPException(resp.status_code, resp.text)
return resp.json()
Example: front-end polling loop (pseudo-JS)
- Render the graph once from /graph
- Then poll /task-runs every second or two to update node colors
```
const flowRunId = "<your-flow-run-id>"
async function loadGraph() {
const graph = await fetch(/flow-runs/${flowRunId}/graph
).then(r => r.json())
// graph.nodes / graph.edges -> build your Cytoscape graph here
}
async function refreshStates() {
const taskRuns = await fetch(/flow-runs/${flowRunId}/task-runs
).then(r => r.json())
// taskRuns is a list; map { task_run.id -> state.type } and update node styling
}
// Initial load
await loadGraph()
// Periodic refreshMarvin
09/23/2025, 9:09 PMprefect.visualization.visualize_task_dependencies
. You could export it to SVG and embed it in your page.
- But for “live stages,” use the API approach above and color nodes by their current state.
Notes and gotchas
- Authentication: If you’re on Prefect Cloud, proxy from your backend so you don’t put an API key in the browser.
- CORS: Proxying also avoids cross-origin issues.
- Updates: Polling every 1–2 seconds is typically sufficient. If you need lower latency, you can also stream logs or events, but polling is simpler to start.
- Mapped tasks: Use the v2 graph endpoint; it represents mapped/grouped tasks more cleanly for visualization.
Would you like a working starter repo (FastAPI + Cytoscape) that wires this up end-to-end? Also, are you using Prefect Cloud or self-hosted Server? That’ll determine the exact API base URL and auth setup.