Daniel Chan
06/30/2025, 3:48 PMMarvin
06/30/2025, 3:48 PMMarvin
06/30/2025, 3:50 PMpython
from prefect import flow
from google.cloud import monitoring_v3
from datetime import datetime
def send_duration_to_gcp(flow_run):
client = monitoring_v3.MetricServiceClient()
project_name = client.project_path('your-project-id')
series = monitoring_v3.TimeSeries()
series.metric.type = '<http://custom.googleapis.com/prefect/flow_duration|custom.googleapis.com/prefect/flow_duration>'
series.metric.labels['flow_name'] = flow_run.flow_name
series.metric.labels['deployment_id'] = flow_run.deployment_id
point = series.points.add()
point.value.double_value = flow_run.total_run_time.total_seconds()
now = datetime.utcnow()
point.interval.end_time.seconds = int(now.timestamp())
client.create_time_series(request={
"name": project_name,
"time_series": [series]
})
@flow
def your_flow():
# Your flow logic here
pass
# Add the metrics handler to the flow
your_flow.on_completion(send_duration_to_gcp)
2. Using Prefect Automations
You can create an automation that triggers on flow completion and sends the metrics to GCP Monitoring:
python
from prefect.automations import create_automation
from prefect.events.schemas import FlowRunCompleted
async def export_to_gcp(event: FlowRunCompleted):
# Similar logic to the handler above
pass
automation = create_automation(
name="Export Flow Duration to GCP",
description="Exports flow run duration to GCP Monitoring",
trigger=FlowRunCompleted,
action=export_to_gcp
)
3. Logging Integration
You can also set up a custom logging handler that forwards logs to GCP Monitoring (formerly Stackdriver):
python
import google.cloud.logging
from prefect.logging import get_run_logger
@flow
def your_flow():
# Set up GCP logging client
logging_client = google.cloud.logging.Client()
logging_client.setup_logging()
logger = get_run_logger()
# Your flow logic here
# Log metrics at the end
<http://logger.info|logger.info>("Flow metrics", extra={
"flow_duration": flow.total_run_time.total_seconds(),
"resource": {
"type": "k8s_container",
"labels": {
"namespace_name": "your-gke-namespace",
"container_name": "prefect-worker"
}
}
})
A few important notes:
1. Make sure your GKE workers have the necessary IAM permissions to write to GCP Monitoring.
2. Consider using the prefect-gcp collection for other GCP integrations you might need.
3. Be mindful of the costs associated with writing custom metrics to GCP Monitoring.
4. Consider batching metrics if you have a high volume of flow runs to avoid hitting API quotas.
For a more comprehensive monitoring solution, you might want to consider using Prefect's built-in monitoring capabilities through the UI and API, supplemented by GCP Monitoring for specific metrics or when you need to consolidate monitoring across different services.Daniel Chan
06/30/2025, 3:51 PMMarvin
06/30/2025, 3:51 PMMarvin
06/30/2025, 3:52 PMpython
from prefect.client import get_client
from datetime import timedelta
async def get_flow_run_durations():
async with get_client() as client:
# Get flow runs from the last 24 hours
flow_runs = await client.read_flow_runs(
sort=flow_run.start_time.desc(),
flow_run_filter=flow_run.start_time > datetime.utcnow() - timedelta(days=1)
)
for run in flow_runs:
if run.total_run_time:
print(f"Flow: {run.name}, Duration: {run.total_run_time}")
2. Late Flow Runs
Prefect provides direct methods to query late runs:
python
from prefect.client import get_client
async def get_late_runs():
async with get_client() as client:
late_runs = await client.read_flow_runs(
flow_run_filter=flow_run.state_type == "LATE"
)
return late_runs
3. Real-time Monitoring with Prefect UI
The Prefect UI provides a dashboard where you can see these metrics without writing code:
- Navigate to the "Flow Runs" page to see duration statistics
- Use the filter "State: Late" to see currently late runs
- Create saved filters for quick access to these views
4. Setting Up Notifications for Late Runs
You can set up automations to alert you when flows run late:
python
from prefect.automations import create_automation
from prefect.events.schemas import FlowRunStateChange
async def alert_on_late_run(event: FlowRunStateChange):
if event.flow_run.state.is_late():
# Add your notification logic here
# e.g., send to Slack, email, etc.
print(f"Flow run {event.flow_run.name} is late!")
automation = create_automation(
name="Late Flow Run Alert",
description="Alerts when flow runs are late",
trigger=FlowRunStateChange,
action=alert_on_late_run
)
5. Custom Metrics Dashboard
If you want to create a custom dashboard, you can use the Prefect REST API to pull these metrics:
python
from prefect.client import get_client
from datetime import datetime, timedelta
async def get_metrics_summary():
async with get_client() as client:
# Get stats for the last 24 hours
since = datetime.utcnow() - timedelta(days=1)
# Get all runs
runs = await client.read_flow_runs(
flow_run_filter=flow_run.start_time > since
)
metrics = {
"total_runs": len(runs),
"late_runs": len([r for r in runs if r.state.is_late()]),
"avg_duration": sum((r.total_run_time.total_seconds() for r in runs if r.total_run_time), 0) / len(runs) if runs else 0
}
return metrics
If you still want to use Prometheus, you can:
1. Create a custom exporter that uses the above methods to expose metrics
2. Use the Prefect REST API to scrape metrics directly into your monitoring system
3. Set up your own metrics endpoint in your infrastructure code
The built-in capabilities should cover most use cases, but if you need specific Prometheus integration, you can wrap these methods in a custom exporter that follows the Prometheus data model.
For most users, I recommend starting with the built-in monitoring capabilities and Prefect's UI, then extending with custom solutions only if needed. The built-in tools provide a robust way to monitor both duration and late runs without additional infrastructure.
Would you like me to elaborate on any of these approaches or show you how to set up specific alertsDaniel Chan
06/30/2025, 3:53 PMMarvin
06/30/2025, 3:53 PMMarvin
06/30/2025, 3:55 PMbash
# Enable via environment variable
export PREFECT_API_ENABLE_METRICS=True
b) Client-side metrics:
bash
# Enable via environment variable
export PREFECT_CLIENT_ENABLE_METRICS=True
export PREFECT_CLIENT_METRICS_PORT=9090 # Default port
2. Available Metrics
The metrics endpoint exposes standard Python-oriented metrics that come with prometheus_client
. However, Prefect-specific metrics are still being developed and expanded.
3. Accessing the Metrics
You can configure Prometheus to scrape these endpoints:
yaml
# prometheus.yml
scrape_configs:
- job_name: 'prefect'
static_configs:
- targets: ['localhost:9090'] # For client metrics
- job_name: 'prefect-server'
static_configs:
- targets: ['prefect-server:4200'] # For server metrics at /api/metrics
4. GCP Monitoring Integration
To get these metrics into GCP Monitoring:
1. Deploy the Prometheus server in your GKE cluster
2. Configure it to scrape the Prefect metrics endpoints
3. Use GCP's managed collection for Prometheus metrics
For flow run total time specifically, you might need to:
1. Either wait for more Prefect-specific metrics to be added to the native Prometheus endpoint
2. Or implement a custom exporter that queries the Prefect API and exposes the metrics you need
Given that the native Prometheus support is still evolving, you might want to consider:
1. Using the Prefect API directly to query metrics (as shown in my previous response)
2. Creating a custom exporter that queries the API and exposes the specific metrics you need
3. Using Prefect's built-in monitoring capabilities through the UI and REST API
Would you like me to provide an example of how to create a custom exporter that specifically exposes flow run total time to Prometheus? This might be the most reliable approach until more Prefect-specific metrics are added to the native Prometheus endpoint.Daniel Chan
06/30/2025, 3:58 PMMarvin
06/30/2025, 3:59 PMMarvin
06/30/2025, 3:59 PMyaml
annotations:
<http://prometheus.io/scrape|prometheus.io/scrape>: "true"
<http://prometheus.io/path|prometheus.io/path>: "/metrics"
<http://prometheus.io/port|prometheus.io/port>: "9090" # Use the port you configured for PREFECT_CLIENT_METRICS_PORT
2. Enable Client Metrics
Make sure client metrics are enabled in your flow run environment:
python
# In your flow code or environment variables
import os
os.environ["PREFECT_CLIENT_ENABLE_METRICS"] = "True"
os.environ["PREFECT_CLIENT_METRICS_PORT"] = "9090" # Match the port in annotations
3. Kubernetes Work Pool Configuration
You can set this up in your work pool's base job template:
python
from prefect.infrastructure import KubernetesJob
job = KubernetesJob(
pod_annotations={
"<http://prometheus.io/scrape|prometheus.io/scrape>": "true",
"<http://prometheus.io/path|prometheus.io/path>": "/metrics",
"<http://prometheus.io/port|prometheus.io/port>": "9090"
}
)
4. Prometheus Configuration
Configure Prometheus to discover and scrape these pods. Here's a typical configuration:
yaml
scrape_configs:
- job_name: 'prefect-flow-runs'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?
replacement: $1:9090
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_pod_label_(.+)
5. ServiceMonitor (if using Prometheus Operator)
If you're using the Prometheus Operator, you can create a ServiceMonitor:
yaml
apiVersion: <http://monitoring.coreos.com/v1|monitoring.coreos.com/v1>
kind: ServiceMonitor
metadata:
name: prefect-flow-runs
namespace: monitoring
spec:
selector:
matchLabels:
app: prefect-flow # Match your flow run pod labels
namespaceSelector:
matchNames:
- your-flow-namespace
endpoints:
- port: metrics
interval: 15s
Remember:
- The metrics port (9090 in examples) should be consistent across all configurations
- Flow run pods need network access to expose the metrics endpoint
- The Prometheus server needs access to scrape the pods
- Consider resource implications of enabling metrics collection
Would you like me to provide more details about any of these configuration steps or help with troubleshooting the setup?Daniel Chan
07/02/2025, 3:37 PMMarvin
07/02/2025, 3:37 PMMarvin
07/02/2025, 3:38 PMpython
PREFECT_CLIENT_METRICS_ENABLED=True
2. PodMonitor Configuration: Verify that your PodMonitor configuration is correctly set up to collect all the necessary metrics. The configuration should include:
- Correct port targeting
- Appropriate scrape interval
- Proper metric relabeling if used
3. Metrics Endpoint: Confirm that the prometheus-prefect-exporter is exposing all required metrics. You can check this by directly accessing the metrics endpoint (typically /metrics
) to see if the timestamp metrics are being exposed.
4. Time Synchronization: Ensure that the time settings are properly synchronized between your Prefect server, the metrics exporter, and GCP.
To troubleshoot this:
1. First, verify the metrics are actually being exposed by checking the raw metrics endpoint:
bash
curl <http://your-prefect-exporter:9090/metrics> | grep flow_run
2. Check the Prometheus scrape configuration logs for any errors related to metric collection.
3. If possible, use a local Prometheus instance to verify the metrics are being collected correctly before they reach GCP.
If you're still experiencing issues after checking these points, it would be helpful to:
- Share your PodMonitor configuration
- Check if any specific flow run metrics are being collected at all
- Verify if this happens for all flow runs or only specific ones
The issue might also be related to how GCP Metrics Explorer is interpreting or displaying the timestamp data. You might want to cross-reference the raw Prometheus metrics with what's showing in GCP to identify where exactly the timestamps are being lost.Daniel Chan
07/02/2025, 3:39 PMMarvin
07/02/2025, 3:39 PMMarvin
07/02/2025, 3:41 PMbash
curl localhost:9090/metrics | grep flow_run
Look specifically for metrics containing _time
or _timestamp
in their names.
2. Verify your environment variables are properly set:
bash
PREFECT_CLIENT_METRICS_ENABLED=True
PREFECT_API_URL=<your-api-url>
3. Check if the metrics server is receiving events by looking at the logs of your prometheus-prefect-exporter pod:
bash
kubectl logs <pod-name> -n <namespace>
4. Verify that your Prefect API server is properly emitting events for flow run state changes.
To get more detailed information about what's happening, you could:
1. Temporarily increase the log level of your prometheus-prefect-exporter
2. Check if the metrics appear in raw Prometheus before they get to GCP
3. Compare the metrics available in Prometheus with what shows up in GCP Metrics Explorer
Could you share:
- A sample of the metrics you do see in GCP Metrics Explorer?
- The configuration you're using for the prometheus-prefect-exporter?
This would help pinpoint whether this is an issue with:
- Event emission from Prefect
- Event collection by the exporter
- Metric transformation in Prometheus
- Or how GCP is interpreting the metrics