José Agretti
09/22/2025, 8:56 PMMarvin
09/22/2025, 8:57 PMMarvin
09/22/2025, 9:04 PMfrom datetime import datetime, timedelta, timezone
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter # optionally filter by flow/deployment/tags/etc.
async def fetch_history():
now = datetime.now(timezone.utc)
async with get_client() as client:
history = await client.read_flow_run_history(
history_start=now - timedelta(days=7),
history_end=now,
history_interval_seconds=3600,
flow_runs_filter=FlowRunFilter() # customize if you want a specific flow/deployment
)
# history.history is a list of time buckets. Each bucket includes state counts and timing stats.
# To inspect the exact shape safely:
for bucket in history.history:
print(bucket.model_dump()) # contains interval_start/end, per-state counts, and duration metrics if available
# If you're not in an async context, run with: asyncio.run(fetch_history())
Notes and options
- Total run time: If you want “total runtime” across runs in the window, you can either:
- Use any duration aggregates included in the history response, or
- Fetch runs in the window and sum (end_time - start_time)
for each run (only where both are present).
- Alerting/automation: You can create Automations that trigger on metric conditions (e.g., “X failures in Y minutes”) and send notifications.
- External dashboards: Many teams poll the history endpoint on a schedule and write the results to their observability stack (e.g., Grafana via JSON API datasource, Datadog via DogStatsD, or GCP Monitoring). The history API already returns time-bucketed data, which makes it straightforward to chart.
A couple of quick questions to tailor this:
- Are you on Prefect Cloud or self-hosted Server?
- Do you want to view this primarily in the Prefect UI, or export to a tool like Grafana/Datadog/GCP Monitoring?José Agretti
09/22/2025, 9:05 PMMarvin
09/22/2025, 9:05 PMMarvin
09/22/2025, 9:09 PMimport asyncio
import os
import time
from datetime import datetime, timedelta, timezone
from prometheus_client import start_http_server, Gauge
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter
# Gauges updated each refresh window
RUNS_BY_STATE = Gauge(
"prefect_flow_runs_total",
"Flow runs counted over the configured window",
["state"]
)
AVG_DURATION = Gauge(
"prefect_flow_run_duration_seconds_avg",
"Average run duration (seconds) over the configured window"
)
REFRESH_SECONDS = int(os.getenv("REFRESH_SECONDS", "60"))
WINDOW_HOURS = int(os.getenv("WINDOW_HOURS", "24"))
async def fetch_history():
now = datetime.now(timezone.utc)
async with get_client() as client:
# Add filters if you want a specific flow/deployment/tags
history = await client.read_flow_run_history(
history_start=now - timedelta(hours=WINDOW_HOURS),
history_end=now,
history_interval_seconds=3600,
flow_runs_filter=FlowRunFilter()
)
return history
def safe_get(d, *keys, default=None):
cur = d
for k in keys:
if cur is None:
return default
cur = cur.get(k)
return cur if cur is not None else default
async def refresh_loop():
while True:
try:
history = await fetch_history()
# Reset metrics each cycle to reflect current window
for label in list(RUNS_BY_STATE._metrics.keys()):
RUNS_BY_STATE.remove(*label)
AVG_DURATION.set(0)
total_counts = {}
durations = []
# Use model_dump to avoid relying on internal attrs
for bucket in history.history:
b = bucket.model_dump()
# Per-state counts in this bucket
for s in b.get("states", []):
state_name = safe_get(s, "state", "name", default="unknown")
count = s.get("count", 0) or 0
total_counts[state_name] = total_counts.get(state_name, 0) + count
# Estimated run time metric may be present (seconds)
# If not, you can compute from individual runs (slower) using read_flow_runs
est = b.get("estimated_run_time")
if isinstance(est, (int, float)):
durations.append(est)
for state, count in total_counts.items():
RUNS_BY_STATE.labels(state=state).set(count)
if durations:
AVG_DURATION.set(sum(durations) / len(durations))
except Exception as e:
# Don’t raise — keep exporter alive
print(f"[exporter] refresh error: {e}")
await asyncio.sleep(REFRESH_SECONDS)
def main():
# Start Prometheus endpoint
port = int(os.getenv("PORT", "8000"))
start_http_server(port)
print(f"Exporter listening on :{port}/metrics")
asyncio.run(refresh_loop())
if __name__ == "__main__":
main()
- Run it:
- Docker: build a tiny image, run alongside your stack (or as a sidecar).
- K8s: deploy as a Deployment/Service; let Prometheus scrape it.
- Notes:Marvin
09/22/2025, 9:09 PMFlowRunFilter
(specific flows, deployments, tags, work queue, etc.).
- If you need per-flow or per-deployment metrics, run multiple filtered queries or fetch runs and aggregate with labels (watch cardinality).
- If your Prefect Server requires auth (typically self-hosted Server doesn’t), inject PREFECT_API_KEY
and PREFECT_API_URL
.
GCP Monitoring approach
- Same idea: poll the flow-run history and publish a custom metric in Cloud Monitoring.
- Minimal example (writes a “runs by state” metric every minute):
import asyncio
import time
from datetime import datetime, timedelta, timezone
from google.cloud import monitoring_v3
from google.api import metric_pb2 as ga_metric
from google.api import monitored_resource_pb2 as ga_resource
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter
PROJECT_ID = "your-gcp-project-id"
METRIC_TYPE = "<http://custom.googleapis.com/prefect/flow_runs_total|custom.googleapis.com/prefect/flow_runs_total>" # state label
REFRESH_SECONDS = 60
async def get_counts():
now = datetime.now(timezone.utc)
async with get_client() as client:
history = await client.read_flow_run_history(
history_start=now - timedelta(hours=24),
history_end=now,
history_interval_seconds=3600,
flow_runs_filter=FlowRunFilter()
)
counts = {}
for bucket in history.history:
b = bucket.model_dump()
for s in b.get("states", []):
state = (s.get("state") or {}).get("name") or "unknown"
counts[state] = counts.get(state, 0) + (s.get("count", 0) or 0)
return counts
def write_timeseries(counts):
client = monitoring_v3.MetricServiceClient()
name = f"projects/{PROJECT_ID}"
series = []
for state, value in counts.items():
ts = monitoring_v3.TimeSeries()
ts.metric.type = METRIC_TYPE
ts.metric.labels["state"] = state
ts.resource.type = "global" # adjust if you prefer gce_instance/k8s_pod/etc.
point = monitoring_v3.Point()
point.value.int64_value = int(value)
now = time.time()
point.interval.end_time.seconds = int(now)
point.interval.end_time.nanos = int((now - int(now)) * 1e9)
ts.points = [point]
series.append(ts)
client.create_time_series(name=name, time_series=series)
async def main():
while True:
try:
counts = await get_counts()
write_timeseries(counts)
except Exception as e:
print(f"[gcp-monitoring] error: {e}")
await asyncio.sleep(REFRESH_SECONDS)
if __name__ == "__main__":
asyncio.run(main())
- Grant the service account “Monitoring Metric Writer” role.
- In Grafana (via GCM datasource) or Cloud Monitoring Dashboards, chart:
- time series of prefect/flow_runs_total
grouped by label state
- add another custom metric for average run duration if desired
2) Event-driven (streaming) metrics
- If you prefer near-real-time counters, you can subscribe to Prefect events (flow-run state transitions) and update Prometheus counters directly from the stream. This avoids polling and can give lower-latency alerting.
- Docs (concepts): Events and subscriptions
- Pattern:
- Start an async subscriber (topics like prefect.flow-run.*
)
- For each event where state changes to Completed/Failed/etc., increment a Counter labeled by state
- Expose those counters at /metrics for Prometheus
What I recommend
- Start with the history-based exporter (simplest to stand up and stable).
- Decide on metric granularity you actually need (per-state only vs per-flow/deployment/work-pool). We can extend the exporter accordingly by filtering or aggregating runs.
- If you need streaming/low latency, add the event-subscriber variant later.
A couple of confirmatory questions so I can tailor the final example:Marvin
09/22/2025, 9:09 PM