<@ULVA73B9P> is there a way to monitor prefect flo...
# ask-marvin
j
@Marvin is there a way to monitor prefect flows total run time, succeeded vs failed flows? Ideally in a time-series manner
m
thought for 414.7 seconds
Yes—there are a few good ways to do this in Prefect 3.x. If you want a time-series of succeeded vs failed (and durations), the Flow Run History endpoint gives you bucketed counts and timing aggregates over any window you choose. You can hit it via the Python client and use the result to drive charts in Grafana/Datadog/etc. - REST/API docs: Flow run history (time-series) - Client reference: Python client methods for flow runs Example: pull hourly success/failure counts for the last 7 days
Copy code
from datetime import datetime, timedelta, timezone
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter  # optionally filter by flow/deployment/tags/etc.

async def fetch_history():
    now = datetime.now(timezone.utc)
    async with get_client() as client:
        history = await client.read_flow_run_history(
            history_start=now - timedelta(days=7),
            history_end=now,
            history_interval_seconds=3600,
            flow_runs_filter=FlowRunFilter()  # customize if you want a specific flow/deployment
        )

    # history.history is a list of time buckets. Each bucket includes state counts and timing stats.
    # To inspect the exact shape safely:
    for bucket in history.history:
        print(bucket.model_dump())  # contains interval_start/end, per-state counts, and duration metrics if available

# If you're not in an async context, run with: asyncio.run(fetch_history())
Notes and options - Total run time: If you want “total runtime” across runs in the window, you can either: - Use any duration aggregates included in the history response, or - Fetch runs in the window and sum
(end_time - start_time)
for each run (only where both are present). - Alerting/automation: You can create Automations that trigger on metric conditions (e.g., “X failures in Y minutes”) and send notifications. - External dashboards: Many teams poll the history endpoint on a schedule and write the results to their observability stack (e.g., Grafana via JSON API datasource, Datadog via DogStatsD, or GCP Monitoring). The history API already returns time-bucketed data, which makes it straightforward to chart. A couple of quick questions to tailor this: - Are you on Prefect Cloud or self-hosted Server? - Do you want to view this primarily in the Prefect UI, or export to a tool like Grafana/Datadog/GCP Monitoring?
j
@Marvin I'm using a self hosted server, and ideally if I can export to GCP monitoring would be good, or something like Prometheus
m
thought for 238.5 seconds
Great — since you’re self-hosting, you’ve got two solid paths that work well in practice: 1) Time-series from the Prefect API (recommended starting point) - Prefect Server exposes a flow-run history endpoint that returns time-bucketed counts by state with timing aggregates. You can poll this and push/export to your observability stack. - Docs: Flow run history (time-series) Prometheus/Grafana approach - Run a tiny “exporter” service that: - periodically calls the flow-run history endpoint - exposes Prometheus metrics at /metrics - Prometheus scrapes that exporter, Grafana charts it. Example exporter (one file, runs anywhere with network access to your Prefect Server):
Copy code
import asyncio
import os
import time
from datetime import datetime, timedelta, timezone

from prometheus_client import start_http_server, Gauge
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter

# Gauges updated each refresh window
RUNS_BY_STATE = Gauge(
    "prefect_flow_runs_total",
    "Flow runs counted over the configured window",
    ["state"]
)
AVG_DURATION = Gauge(
    "prefect_flow_run_duration_seconds_avg",
    "Average run duration (seconds) over the configured window"
)

REFRESH_SECONDS = int(os.getenv("REFRESH_SECONDS", "60"))
WINDOW_HOURS = int(os.getenv("WINDOW_HOURS", "24"))

async def fetch_history():
    now = datetime.now(timezone.utc)
    async with get_client() as client:
        # Add filters if you want a specific flow/deployment/tags
        history = await client.read_flow_run_history(
            history_start=now - timedelta(hours=WINDOW_HOURS),
            history_end=now,
            history_interval_seconds=3600,
            flow_runs_filter=FlowRunFilter()
        )
    return history

def safe_get(d, *keys, default=None):
    cur = d
    for k in keys:
        if cur is None:
            return default
        cur = cur.get(k)
    return cur if cur is not None else default

async def refresh_loop():
    while True:
        try:
            history = await fetch_history()

            # Reset metrics each cycle to reflect current window
            for label in list(RUNS_BY_STATE._metrics.keys()):
                RUNS_BY_STATE.remove(*label)
            AVG_DURATION.set(0)

            total_counts = {}
            durations = []

            # Use model_dump to avoid relying on internal attrs
            for bucket in history.history:
                b = bucket.model_dump()

                # Per-state counts in this bucket
                for s in b.get("states", []):
                    state_name = safe_get(s, "state", "name", default="unknown")
                    count = s.get("count", 0) or 0
                    total_counts[state_name] = total_counts.get(state_name, 0) + count

                # Estimated run time metric may be present (seconds)
                # If not, you can compute from individual runs (slower) using read_flow_runs
                est = b.get("estimated_run_time")
                if isinstance(est, (int, float)):
                    durations.append(est)

            for state, count in total_counts.items():
                RUNS_BY_STATE.labels(state=state).set(count)

            if durations:
                AVG_DURATION.set(sum(durations) / len(durations))

        except Exception as e:
            # Don’t raise — keep exporter alive
            print(f"[exporter] refresh error: {e}")

        await asyncio.sleep(REFRESH_SECONDS)

def main():
    # Start Prometheus endpoint
    port = int(os.getenv("PORT", "8000"))
    start_http_server(port)
    print(f"Exporter listening on :{port}/metrics")

    asyncio.run(refresh_loop())

if __name__ == "__main__":
    main()
- Run it: - Docker: build a tiny image, run alongside your stack (or as a sidecar). - K8s: deploy as a Deployment/Service; let Prometheus scrape it. - Notes:
- You can tighten the scope via
FlowRunFilter
(specific flows, deployments, tags, work queue, etc.). - If you need per-flow or per-deployment metrics, run multiple filtered queries or fetch runs and aggregate with labels (watch cardinality). - If your Prefect Server requires auth (typically self-hosted Server doesn’t), inject
PREFECT_API_KEY
and
PREFECT_API_URL
. GCP Monitoring approach - Same idea: poll the flow-run history and publish a custom metric in Cloud Monitoring. - Minimal example (writes a “runs by state” metric every minute):
Copy code
import asyncio
import time
from datetime import datetime, timedelta, timezone

from google.cloud import monitoring_v3
from google.api import metric_pb2 as ga_metric
from google.api import monitored_resource_pb2 as ga_resource

from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter

PROJECT_ID = "your-gcp-project-id"
METRIC_TYPE = "<http://custom.googleapis.com/prefect/flow_runs_total|custom.googleapis.com/prefect/flow_runs_total>"  # state label
REFRESH_SECONDS = 60

async def get_counts():
    now = datetime.now(timezone.utc)
    async with get_client() as client:
        history = await client.read_flow_run_history(
            history_start=now - timedelta(hours=24),
            history_end=now,
            history_interval_seconds=3600,
            flow_runs_filter=FlowRunFilter()
        )

    counts = {}
    for bucket in history.history:
        b = bucket.model_dump()
        for s in b.get("states", []):
            state = (s.get("state") or {}).get("name") or "unknown"
            counts[state] = counts.get(state, 0) + (s.get("count", 0) or 0)
    return counts

def write_timeseries(counts):
    client = monitoring_v3.MetricServiceClient()
    name = f"projects/{PROJECT_ID}"

    series = []
    for state, value in counts.items():
        ts = monitoring_v3.TimeSeries()
        ts.metric.type = METRIC_TYPE
        ts.metric.labels["state"] = state
        ts.resource.type = "global"  # adjust if you prefer gce_instance/k8s_pod/etc.

        point = monitoring_v3.Point()
        point.value.int64_value = int(value)
        now = time.time()
        point.interval.end_time.seconds = int(now)
        point.interval.end_time.nanos = int((now - int(now)) * 1e9)
        ts.points = [point]
        series.append(ts)

    client.create_time_series(name=name, time_series=series)

async def main():
    while True:
        try:
            counts = await get_counts()
            write_timeseries(counts)
        except Exception as e:
            print(f"[gcp-monitoring] error: {e}")
        await asyncio.sleep(REFRESH_SECONDS)

if __name__ == "__main__":
    asyncio.run(main())
- Grant the service account “Monitoring Metric Writer” role. - In Grafana (via GCM datasource) or Cloud Monitoring Dashboards, chart: - time series of
prefect/flow_runs_total
grouped by label
state
- add another custom metric for average run duration if desired 2) Event-driven (streaming) metrics - If you prefer near-real-time counters, you can subscribe to Prefect events (flow-run state transitions) and update Prometheus counters directly from the stream. This avoids polling and can give lower-latency alerting. - Docs (concepts): Events and subscriptions - Pattern: - Start an async subscriber (topics like
prefect.flow-run.*
) - For each event where state changes to Completed/Failed/etc., increment a Counter labeled by state - Expose those counters at /metrics for Prometheus What I recommend - Start with the history-based exporter (simplest to stand up and stable). - Decide on metric granularity you actually need (per-state only vs per-flow/deployment/work-pool). We can extend the exporter accordingly by filtering or aggregating runs. - If you need streaming/low latency, add the event-subscriber variant later. A couple of confirmatory questions so I can tailor the final example:
- Are you running Prefect Server on Kubernetes, Docker Compose, or a VM? - For Prometheus, do you already have a Prometheus server scraping targets, or do you prefer a Pushgateway model? - For GCP, are you okay with custom metrics billing and the 1-min write cadence? Happy to adapt the sample to your environment (e.g., Helm manifests for K8s, a Dockerfile, labels for flow/deployment/work-pool).