<@ULVA73B9P> say i am using prefect to run client ...
# marvin-ai
r
@Marvin say i am using prefect to run client jobs, but so the prefect instance is private since clients shouldn’t be able to see each others jobs. is there a way to query the prefect api/db for a job to get the dag data required to show a dag of tasks being run / have completed for the client’s jobs in my own ui?
m
thought for 210.1 seconds
Yes—don’t query the DB directly. In Prefect 3.x you can fetch the flow run’s task graph (the DAG you see in the UI) via the API or the Python client and render it in your own UI. What to use - Python client:
PrefectClient.read_flow_run_graph(flow_run_id, since=...)
- REST API: the “read flow run graph” endpoint documented here: Read Flow Run Graph (v2) The response gives you: - Nodes: one per task run (and subflow runs), including metadata like label, state_type, start/end times - Edges: parent/child relationships you can use to render a DAG Example (async) using the Python client
Copy code
import asyncio
from datetime import datetime, timezone
from prefect.client.orchestration import PrefectClient

# Ensure your environment points at your private Prefect API
# export PREFECT_API_URL="<https://your-private-prefect/api>"

async def fetch_flow_run_graph(flow_run_id: str, since: datetime | None = None):
    async with PrefectClient() as client:
        graph = await client.read_flow_run_graph(flow_run_id, since=since)

        # Convert to a simple structure for your UI
        nodes = []
        edges = []
        # graph.nodes is typically a mapping of node_id -> node
        for node_id, node in graph.nodes.items():
            nodes.append({
                "id": str(node_id),
                "label": getattr(node, "label", None),
                "kind": getattr(node, "kind", None),             # e.g., "task-run" or "flow-run"
                "state_type": getattr(node, "state_type", None), # e.g., "COMPLETED", "RUNNING"
                "start_time": getattr(node, "start_time", None),
                "end_time": getattr(node, "end_time", None),
            })

            # Children typically references downstream node ids
            for child_id in getattr(node, "children", []):
                edges.append({"source": str(node_id), "target": str(child_id)})

        return {"nodes": nodes, "edges": edges}

async def main():
    flow_run_id = "YOUR_FLOW_RUN_ID"
    # Optional: incremental updates since a timestamp
    since = datetime.now(tz=timezone.utc)  # or None for full graph
    dag = await fetch_flow_run_graph(flow_run_id, since=None)
    print(dag)

asyncio.run(main())
Notes and tips - Real-time/streaming: the
since
parameter lets you poll for incremental updates and update your UI without re-downloading the entire graph. - Multi-tenant isolation: since your Prefect instance is private per client, you can expose only the flow runs that belong to that client in your app. If you do need to list a client’s flow runs first, the client also supports filtered listing (e.g., by tags, deployment, work queue) before you fetch each run’s graph. - Authentication: - Self-hosted Prefect Server: set
PREFECT_API_URL
to your instance URL. - Prefect Cloud: also set
PREFECT_API_KEY
. (Not needed if fully private Server.) Docs - Python client and flow run APIs: Python API reference – flow runs - REST endpoint: Read Flow Run Graph (v2) If you share whether you’re using Server or Cloud and how you identify “a client’s runs” (tags, work pools, separate workspaces), I can sketch the listing + graph-fetch pattern tailored to your setup.