Daniel Chan
07/01/2025, 2:29 AMMarvin
07/01/2025, 2:29 AMMarvin
07/01/2025, 2:30 AMstart_client_metrics_server()PREFECT_CLIENT_METRICS_ENABLEDPREFECT_CLIENT_METRICS_ENABLEDstart_client_metrics_server()python
from prefect import flow
from prefect.utilities.services import start_client_metrics_server
# Start the metrics server
start_client_metrics_server()
@flow
def my_flow():
    # Your flow code here
    passPREFECT_CLIENT_METRICS_ENABLED=truePREFECT_CLIENT_METRICS_PORThttp://<pod-ip>:<metrics-port>/metricsDaniel Chan
07/01/2025, 2:32 AMMarvin
07/01/2025, 2:32 AMMarvin
07/01/2025, 2:33 AMPREFECT_CLIENT_METRICS_ENABLEDPREFECT_CLIENT_METRICS_ENABLED=truestart_client_metrics_server()yaml
apiVersion: <http://prefect.io/v1alpha1|prefect.io/v1alpha1>
kind: PrefectDeployment
metadata:
  name: my-flow
spec:
  flow:
    name: my-flow
  env:
    - name: PREFECT_CLIENT_METRICS_ENABLED
      value: "true"
    - name: PREFECT_CLIENT_METRICS_PORT
      value: "9090"  # optional, defaults to 9090Daniel Chan
07/01/2025, 2:34 AMMarvin
07/01/2025, 2:34 AMMarvin
07/01/2025, 2:34 AMPREFECT_CLIENT_METRICS_PORTPREFECT_CLIENT_METRICS_PORTDaniel Chan
07/01/2025, 2:34 AMMarvin
07/01/2025, 2:34 AMMarvin
07/01/2025, 2:35 AMPREFECT_LOGGING_ORION_ENABLEDPREFECT_LOGGING_TO_API_ENABLEDPREFECT_CLIENT_METRICS_ENABLED=truestart_client_metrics_server()PREFECT_CLIENT_METRICS_PORTDaniel Chan
07/01/2025, 2:38 AMMarvin
07/01/2025, 2:38 AMMarvin
07/01/2025, 2:38 AMyaml
# NetworkPolicy (if you're using network policies)
apiVersion: <http://networking.k8s.io/v1|networking.k8s.io/v1>
kind: NetworkPolicy
metadata:
  name: allow-prometheus-scraping
spec:
  podSelector:
    matchLabels:
      # match your Prefect flow pod labels
      app: your-prefect-flow
  policyTypes:
  - Ingress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          # adjust based on where your Prometheus is running
          <http://kubernetes.io/metadata.name|kubernetes.io/metadata.name>: monitoring
    ports:
    - protocol: TCP
      port: 4201
---
# PodMonitor
apiVersion: <http://monitoring.coreos.com/v1|monitoring.coreos.com/v1>
kind: PodMonitor
metadata:
  name: prefect-flow-monitor
spec:
  selector:
    matchLabels:
      # match your Prefect flow pod labels
      app: your-prefect-flow
  podMetricsEndpoints:
  - port: metrics  # or the name you've given to the port in your pod spec
    path: /metricsyaml
spec:
  containers:
  - name: flow
    ports:
    - name: metrics
      containerPort: 4201Daniel Chan
07/01/2025, 2:51 AMMarvin
07/01/2025, 2:51 AMMarvin
07/01/2025, 2:53 AMstart_client_metrics_server()python
from prefect import flow
from prefect.utilities.services import start_client_metrics_server
start_client_metrics_server()
@flow
def my_flow():
    # your flow code here
    passyaml
apiVersion: <http://monitoring.coreos.com/v1|monitoring.coreos.com/v1>
kind: PodMonitor
metadata:
  name: prefect-flow-monitor
spec:
  selector:
    matchLabels:
      # whatever labels identify your Prefect flow run pods
      <http://prefect.io/flow-run-name|prefect.io/flow-run-name>: ""
  namespaceSelector:
    matchNames:
      - your-prefect-jobs-namespace
  podMetricsEndpoints:
  - port: metrics  # or the actual port number 4201
    path: /metricsDaniel Chan
07/01/2025, 3:56 AMMarvin
07/01/2025, 3:56 AMMarvin
07/01/2025, 3:58 AMlate_runs_countstart_client_metrics_server()prefect_info_work_queueslate_runs_countstart_client_metrics_server()Daniel Chan
07/01/2025, 4:26 AMMarvin
07/01/2025, 4:26 AMMarvin
07/01/2025, 4:27 AMend_timeNoneprefect_info_flow_runsstart_timeend_timestart_timeNoneend_time/flow-runs/{id}Daniel Chan
07/01/2025, 4:30 AMMarvin
07/01/2025, 4:30 AMMarvin
07/01/2025, 4:30 AMbash
curl http://<your-exporter-host>:9090/metrics | grep prefect_info_flow_runsprefect_info_flow_runsbash
# First find the pod name
kubectl get pods -n <namespace> | grep prometheus-prefect-exporter
# Then curl the metrics endpoint from the pod
kubectl exec -n <namespace> <exporter-pod-name> -- curl localhost:9090/metrics | grep prefect_info_flow_runsprefect_info_flow_runs{deployment_id="...",flow_id="...",flow_name="...",flow_run_id="...",flow_run_name="...",start_time="...",end_time="...",state="..."} 1end_timeNoneDaniel Chan
07/01/2025, 4:40 AMcurl <http://localhost:8000/metrics> | grep prefect_info_flow_runs
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 13172  100 13172    0     0  16389      0 --:--:-- --:--:-- --:--:-- 16403
# HELP prefect_info_flow_runs Prefect flow runs info
# TYPE prefect_info_flow_runs gauge
prefect_info_flow_runs{created="2025-07-01T04:36:22.340281Z",deployment_id="25a4caac-2326-4c53-9037-4ce3fbd67427",deployment_name="nucleus_ingestion",end_time="2025-07-01T04:38:08.870199Z",flow_id="14f42903-d3e9-4ffb-bab0-0d14d5af08bb",flow_name="Nucleus Ingestion",flow_run_id="ec94907b-ad24-4da6-a0ef-a85cd3323cb9",flow_run_name="dashing-hummingbird",run_count="1",start_time="2025-07-01T04:36:57.346805Z",state_id="df5e3c7d-76bf-413d-9667-259204462ffa",state_name="Completed",total_run_time="71.523394",work_queue_name="sideflows"} 0.0
prefect_info_flow_runs{created="2025-07-01T04:36:23.722145Z",deployment_id="25a4caac-2326-4c53-9037-4ce3fbd67427",deployment_name="nucleus_ingestion",end_time="2025-07-01T04:38:15.269128Z",flow_id="14f42903-d3e9-4ffb-bab0-0d14d5af08bb",flow_name="Nucleus Ingestion",flow_run_id="e26bb1ac-bf6a-4cf7-be10-9df7b106515b",flow_run_name="tan-barnacle",run_count="1",start_time="2025-07-01T04:37:00.427239Z",state_id="93e8a5d7-c9ff-4217-bcdb-9d70a242c63e",state_name="Completed",total_run_time="74.841889",work_queue_name="sideflows"} 0.0
prefect_info_flow_runs{created="2025-07-01T04:36:23.657403Z",deployment_id="25a4caac-2326-4c53-9037-4ce3fbd67427",deployment_name="nucleus_ingestion",end_time="2025-07-01T04:37:43.390337Z",flow_id="14f42903-d3e9-4ffb-bab0-0d14d5af08bb",flow_name="Nucleus Ingestion",flow_run_id="db75d2a3-0f47-4dde-a199-74b0fd4b1021",flow_run_name="blazing-turkey",run_count="1",start_time="2025-07-01T04:36:44.216575Z",state_id="9c695f64-a224-49ac-921b-33487fc56096",state_name="Completed",total_run_time="59.173762",work_queue_name="sideflows"} 0.0
prefect_info_flow_runs{created="2025-07-01T04:36:20.536407Z",deployment_id="25a4caac-2326-4c53-9037-4ce3fbd67427",deployment_name="nucleus_ingestion",end_time="2025-07-01T04:37:38.261706Z",flow_id="14f42903-d3e9-4ffb-bab0-0d14d5af08bb",flow_name="Nucleus Ingestion",flow_run_id="5fd40ae3-9af2-4f1c-9fea-7436639a5fa2",flow_run_name="xanthic-orca",run_count="1",start_time="2025-07-01T04:36:38.388683Z",state_id="4399abf6-5d2b-48c4-9f0f-874cf0500c49",state_name="Completed",total_run_time="59.873023",work_queue_name="sideflows"} 0.0
prefect_info_flow_runs{created="2025-07-01T04:36:22.390574Z",deployment_id="25a4caac-2326-4c53-9037-4ce3fbd67427",deployment_name="nucleus_ingestion",end_time="2025-07-01T04:37:41.802947Z",flow_id="14f42903-d3e9-4ffb-bab0-0d14d5af08bb",flow_name="Nucleus Ingestion",flow_run_id="343402e2-3321-4b55-8599-09d62e0592ff",flow_run_name="loose-clam",run_count="1",start_time="2025-07-01T04:36:42.445354Z",state_id="44e45d89-29e5-4269-9998-8db78d48f12d",state_name="Completed",total_run_time="59.357593",work_queue_name="sideflows"} 0.0Marvin
07/01/2025, 4:40 AMMarvin
07/01/2025, 4:40 AMend_timeflow_run_name="dashing-hummingbird",
start_time="2025-07-01T04:36:57.346805Z",
end_time="2025-07-01T04:38:08.870199Z",
state_name="Completed"yaml
apiVersion: <http://monitoring.coreos.com/v1|monitoring.coreos.com/v1>
kind: PodMonitor
metadata:
  name: prefect-flow-monitor
spec:
  podMetricsEndpoints:
  - interval: 5s  # More frequent scraping
    port: metrics
    path: /metricsDaniel Chan
07/01/2025, 5:00 AMMarvin
07/01/2025, 5:00 AMMarvin
07/01/2025, 5:01 AMbash
   # Get the exporter logs
   kubectl logs <prometheus-prefect-exporter-pod> -n <namespace>python
   from prefect.client import get_client
   
   async with get_client() as client:
       flow_run = await client.read_flow_run("<flow-run-id>")
       print(flow_run.end_time)Daniel Chan
07/01/2025, 5:02 AMMarvin
07/01/2025, 5:02 AMMarvin
07/01/2025, 5:03 AMyaml
   # In your prometheus-prefect-exporter deployment
   env:
     - name: PREFECT_API_POLL_INTERVAL
       value: "10"  # Decrease the interval (in seconds) to catch more updatesyaml
   apiVersion: <http://monitoring.coreos.com/v1|monitoring.coreos.com/v1>
   kind: PodMonitor
   spec:
     podMetricsEndpoints:
     - interval: 5s  # More frequent scraping
       port: metrics
       path: /metricsbash
   kubectl logs -f <prometheus-prefect-exporter-pod> -n <namespace>Daniel Chan
07/01/2025, 5:06 AMPREFECT_API_POLL_INTERVALMarvin
07/01/2025, 5:06 AMMarvin
07/01/2025, 5:07 AMPREFECT_API_POLL_INTERVALDaniel Chan
07/01/2025, 3:24 PMMarvin
07/01/2025, 3:24 PMMarvin
07/01/2025, 3:25 PMDaniel Chan
07/01/2025, 3:26 PMMarvin
07/01/2025, 3:26 PMMarvin
07/01/2025, 3:27 PMDaniel Chan
07/01/2025, 3:32 PMMarvin
07/01/2025, 3:32 PMMarvin
07/01/2025, 3:32 PMDaniel Chan
07/01/2025, 3:35 PM# Calculate timestamps for before and after data
        after_data = datetime.now(timezone.utc) - timedelta(minutes=offset_minutes)
        self.after_data_fmt = after_data.strftime("%Y-%m-%dT%H:%M:%S.%fZ")Marvin
07/01/2025, 3:35 PMMarvin
07/01/2025, 3:36 PMafter_data%Y-%m-%dT%H:%M:%S.%fZafter_datapython
offset_minutes = 60
# At 2:00 PM, this would mean:
after_data = 1:00 PM
# Your query window would be: 1:00 PM to 2:00 PMDaniel Chan
07/01/2025, 3:39 PMMarvin
07/01/2025, 3:39 PMMarvin
07/01/2025, 3:39 PMpython
# If current time is 2:00 PM
after_data = 2:00 PM  # (now - 0 minutes)
# Query window: "Get all flow runs from 2:00 PM onwards"python
# If current time is 2:00 PM
after_data = 12:00 PM  # (now - 120 minutes)
# Query window: "Get all flow runs from 12:00 PM onwards"2:00 PM: Flow A starts
2:01 PM: Prometheus scrapes (offset=0)
         → Sees Flow A in "Running" state
2:02 PM: Flow A completes
2:02 PM: Prometheus scrapes (offset=0)
         → Sees Flow A in "Completed" stateDaniel Chan
07/01/2025, 3:54 PMMarvin
07/01/2025, 3:54 PMMarvin
07/01/2025, 3:55 PMSCRAPE_INTERVAL_SECONDS=1202:00 PM: Scrape happens
         Query window: "Get flows from 2:00 PM onwards"
2:01 PM: Flow A runs and completes
2:02 PM: Flow B runs and completes
2:02 PM: Scrape happens
         Query window: "Get flows from 2:02 PM onwards"
         → MISSES both Flow A and Flow B!SCRAPE_INTERVAL_SECONDS=1202:00 PM: Scrape happens
         Query window: "Get flows from 1:57 PM onwards"
2:01 PM: Flow A runs and completes
2:02 PM: Flow B runs and completes
2:02 PM: Scrape happens
         Query window: "Get flows from 1:59 PM onwards"
         → CAPTURES both Flow A and Flow BOFFSET_MINUTES should be > (SCRAPE_INTERVAL_SECONDS / 60)Daniel Chan
07/01/2025, 3:57 PMMarvin
07/01/2025, 3:57 PMMarvin
07/01/2025, 3:58 PMSCRAPE_INTERVAL_SECONDS = 5
OFFSET_MINUTES = 1SCRAPE_INTERVAL_SECONDS = 15  # or 30
OFFSET_MINUTES = 1scrape_duration_secondsDaniel Chan
07/01/2025, 4:01 PM# Run the loop to collect Prefect metrics
    while True:
        time.sleep(scrape_interval_seconds)Marvin
07/01/2025, 4:02 PMMarvin
07/01/2025, 4:02 PMwhile Truetime.sleep()python
while True:
    time.sleep(scrape_interval_seconds)
    # Scrape starts hereTime 0:00 - Scrape A starts
Time 0:05 - New scrape B starts (even if A is still running!)
Time 0:10 - New scrape C starts (even if A or B are still running!)SCRAPE_INTERVAL_SECONDS = 30  # or higher
   OFFSET_MINUTES = 2  # provides adequate overlapwhile Truetime.sleep()Daniel Chan
07/01/2025, 4:03 PMwhile Truetime.sleep()Marvin
07/01/2025, 4:04 PMMarvin
07/01/2025, 4:04 PMwhile Truepython
   while True:
       time.sleep(scrape_interval_seconds)  # Thread blocks here
       # Scrape code runs here
       # When done, loops back to sleeptime.sleep()0:00:00 - Loop starts
0:00:00 - Hits time.sleep(5) → Thread blocks for 5 seconds
0:00:05 - Sleep ends
0:00:05 - Scrape starts
0:00:07 - Scrape finishes (took 2 seconds)
0:00:07 - Hits time.sleep(5) again → Blocks for 5 seconds
0:00:12 - Sleep ends
0:00:12 - Next scrape startsasyncioapschedulerscrape_interval_secondsDaniel Chan
07/01/2025, 4:05 PMwhile Trueimport os
import base64
import logging
import time
import uuid
from metrics.metrics import PrefectMetrics
from metrics.healthz import PrefectHealthz
from prometheus_client import start_http_server, REGISTRY
def metrics():
    """
    Main entry point for the PrefectMetrics exporter.
    """
    # Get environment variables or use default values
    loglevel = str(os.getenv("LOG_LEVEL", "INFO"))
    max_retries = int(os.getenv("MAX_RETRIES", "3"))
    metrics_addr = os.getenv("METRICS_ADDR", "0.0.0.0")
    metrics_port = int(os.getenv("METRICS_PORT", "8000"))
    offset_minutes = int(os.getenv("OFFSET_MINUTES", "3"))
    url = str(os.getenv("PREFECT_API_URL", "<http://localhost:4200/api>"))
    api_key = str(os.getenv("PREFECT_API_KEY", ""))
    api_auth_string = str(os.getenv("PREFECT_API_AUTH_STRING", ""))
    csrf_client_id = str(uuid.uuid4())
    scrape_interval_seconds = int(os.getenv("SCRAPE_INTERVAL_SECONDS", "30"))
    # Configure logging
    logging.basicConfig(
        level=loglevel, format="%(asctime)s - %(name)s - [%(levelname)s] %(message)s"
    )
    logger = logging.getLogger("prometheus-prefect-exporter")
    # Configure headers for HTTP requests
    headers = {"accept": "application/json", "Content-Type": "application/json"}
    if api_auth_string:
        api_auth_string_encoded = base64.b64encode(api_auth_string.encode("utf-8")).decode("utf-8")
        headers["Authorization"] = f"Basic {api_auth_string_encoded}"
        <http://logger.info|logger.info>("Added Basic Authorization header for PREFECT_API_AUTH_STRING")
    if api_key:
        headers["Authorization"] = f"Bearer {api_key}"
        <http://logger.info|logger.info>("Added Bearer Authorization header for PREFECT_API_KEY")
    # check endpoint
    PrefectHealthz(
        url=url, headers=headers, max_retries=max_retries, logger=logger
    ).get_health_check()
    ##
    # NOTIFY IF PAGINATION IS ENABLED
    #
    enable_pagination = str(os.getenv("PAGINATION_ENABLED", "True")) == "True"
    pagination_limit = int(os.getenv("PAGINATION_LIMIT", 200))
    if enable_pagination:
        <http://logger.info|logger.info>("Pagination is enabled")
        <http://logger.info|logger.info>(f"Pagination limit is {pagination_limit}")
    else:
        <http://logger.info|logger.info>("Pagination is disabled")
    # Create an instance of the PrefectMetrics class
    metrics = PrefectMetrics(
        url=url,
        headers=headers,
        offset_minutes=offset_minutes,
        max_retries=max_retries,
        client_id=csrf_client_id,
        csrf_enabled=str(os.getenv("PREFECT_CSRF_ENABLED", "False")) == "True",
        logger=logger,
        # Enable pagination if not specified to avoid breaking existing deployments
        enable_pagination=enable_pagination,
        pagination_limit=pagination_limit,
    )
    # Register the metrics with Prometheus
    <http://logger.info|logger.info>("Initializing metrics...")
    REGISTRY.register(metrics)
    # Start the HTTP server to expose Prometheus metrics
    start_http_server(metrics_port, metrics_addr)
    <http://logger.info|logger.info>(f"Exporter listening on {metrics_addr}:{metrics_port}")
    # Run the loop to collect Prefect metrics
    while True:
        time.sleep(scrape_interval_seconds)
if __name__ == "__main__":
    metrics()Marvin
07/01/2025, 4:05 PMMarvin
07/01/2025, 4:05 PMprometheus_clientpython
REGISTRY.register(metrics)metricsPrefectMetricspython
start_http_server(metrics_port, metrics_addr)while Truepython
   # This starts a web server in a separate thread
   start_http_server(metrics_port, metrics_addr)prometheus_clientcollect()while Truewhile Truepython
   while True:
       time.sleep(scrape_interval_seconds)signal.pause()1. Program starts
2. Metrics collector is registered
3. HTTP server starts in background thread
4. Main thread enters infinite sleep loop
When Prometheus scrapes:
→ HTTP request comes to background server
→ Server triggers collect() method
→ Fresh metrics are gathered and returned
(All this happens independently of the sleep loop!)scrape_interval_secondscollect()PrefectMetricsSCRAPE_INTERVAL_SECONDSpython
import signal
def metrics():
    # ... same setup code ...
    
    # Start the HTTP server
    start_http_server(metrics_port, metrics_addr)
    <http://logger.info|logger.info>(f"Exporter listening on {metrics_addr}:{metrics_port}")
    
    # Wait for signals (more proper than while True/sleep)
    signal.pause()while TrueDaniel Chan
07/01/2025, 5:21 PMSCRAPE_INTERVAL_SECONDSpolling_interval_secondsSCRAPE_INTERVAL_SECONDSMarvin
07/01/2025, 5:21 PMMarvin
07/01/2025, 5:22 PMClarify purpose of SCRAPE_INTERVAL_SECONDS and its relationship to Prometheus scraping## Background
I noticed some historical changes regarding scrape intervals in the exporter:
1. PR <https://github.com/PrefectHQ/prometheus-prefect-exporter/commit/7a3d105235c9ba6020df8f111bc2cf88905a7ffa|7a3d105> removed `polling_interval_seconds` from PrefectMetrics
2. PR <https://github.com/PrefectHQ/prometheus-prefect-exporter/pull/57|#57> later added `SCRAPE_INTERVAL_SECONDS`
## Current Implementation
The current code in main.py uses `SCRAPE_INTERVAL_SECONDS` in a while True loop:
```python
while True:
    time.sleep(scrape_interval_seconds)polling_interval_secondsSCRAPE_INTERVAL_SECONDSSCRAPE_INTERVAL_SECONDSSCRAPE_INTERVAL_SECONDSDaniel Chan
07/01/2025, 5:53 PMMarvin
07/01/2025, 5:53 PMMarvin
07/01/2025, 5:54 PMprefect_info_flow_runsOFFSET_MINUTESpython
   from prefect.client import get_client
   
   async with get_client() as client:
       flow_run = await client.read_flow_run(flow_run_id)
       print(f"State: {flow_run.state.type}")
       print(f"End Time: {flow_run.end_time}")OFFSET_MINUTESOFFSET_MINUTESDaniel Chan
07/01/2025, 5:55 PMMarvin
07/01/2025, 5:55 PMMarvin
07/01/2025, 5:57 PMSCRAPE_INTERVAL_SECONDS=5OFFSET_MINUTES=1[now - 1 minute] <-------- 1 minute window --------> [now]
         ^                           ^                    ^
   Window start              Flow completes           Scrape timeTime 0s: Flow reaches "Completed" state
   Time 0.1s: State recorded
   Time 0.2s: Prometheus scrapes (sees Completed but no end_time yet)
   Time 0.3s: end_time recorded
   Time 5s: Next scrape (might miss it if flow is now outside window)OFFSET_MINUTES=2
   SCRAPE_INTERVAL_SECONDS=15OFFSET_MINUTES=1
   SCRAPE_INTERVAL_SECONDS=30Daniel Chan
07/01/2025, 5:58 PMMarvin
07/01/2025, 5:58 PMMarvin
07/01/2025, 5:58 PMSCRAPE_INTERVAL_SECONDSpython
# From main.py
while True:
    time.sleep(scrape_interval_seconds)SCRAPE_INTERVAL_SECONDSyaml
scrape_configs:
  - job_name: 'prefect'
    scrape_interval: 15s  # THIS controls when Prometheus collects metrics
    static_configs:
      - targets: ['localhost:8000'][now - OFFSET_MINUTES][now]OFFSET_MINUTESSCRAPE_INTERVAL_SECONDSSCRAPE_INTERVAL_SECONDSDaniel Chan
07/01/2025, 6:09 PMMarvin
07/01/2025, 6:09 PMMarvin
07/01/2025, 6:10 PMOFFSET_MINUTES=60:
- 1 flow per minute = ~60 flows per query
- Each flow has state, metadata, etc.
OFFSET_MINUTES=10:
- 1 flow per minute = ~10 flows per query
- 6x less data per queryDaniel Chan
07/01/2025, 6:12 PMprometheus/prefect_info_flow_runs/gaugeMarvin
07/01/2025, 6:12 PMMarvin
07/01/2025, 6:13 PMprometheus/prefect_info_flow_runs/gauge[now - OFFSET_MINUTES] <-------- window --------> [now]
                                                      |
                                              Only flows in this
                                              window are reported10:00 AM: Flow A completes
   10:05 AM: You see Flow A in metrics (within window)
   10:11 AM: Flow A disappears (outside window)yaml
   rules:
     - record: flow_runs_completed_total
       expr: sum(prefect_info_flow_runs)prefect_info_flow_runsDaniel Chan
07/01/2025, 7:05 PMEcho FlowEcho FlowNucleus IngestionMarvin
07/01/2025, 7:05 PMMarvin
07/01/2025, 7:06 PMFlow Run ID: ABC123
   Metrics show:
   - Row 1: state=Running, start_time=present, end_time=null
   - Row 2: state=Completed, start_time=present, end_time=presentFlow Run 1: Clean completion
   - State transition: Running → Completed
   - end_time properly recorded
   
   Flow Run 2: Irregular completion
   - State transition: Running → Completed (but interrupted)
   - end_time might not be recorded properlypython
   from prefect.client import get_client
   
   async with get_client() as client:
       # Get flow runs for your flows
       flow_runs = await client.read_flow_runs(
           flow_name={"any_": ["Echo Flow", "Nucleus Ingestion"]}
       )
       for run in flow_runs:
           print(f"Flow: {run.name}")
           print(f"State: {run.state.type}")
           print(f"End Time: {run.end_time}")
           print("---")python
   # In your flow code
   @flow(
       name="Echo Flow",
       retries=0,  # Prevent automatic retries
       persist_result=True  # Ensure state transitions are recorded
   )Daniel Chan
07/01/2025, 7:07 PMMarvin
07/01/2025, 7:07 PMMarvin
07/01/2025, 7:08 PMpython
from prefect.client import get_client
from datetime import datetime, timezone, timedelta
async def check_flow_run_times(flow_name):
    async with get_client() as client:
        # Get recent flow runs for the specified flow
        flow_runs = await client.read_flow_runs(
            flow_name={"any_": [flow_name]},
            sort="-start_time",
            limit=5
        )
        
        for run in flow_runs:
            print(f"\nFlow Run ID: {run.id}")
            print(f"State: {run.state.type}")
            print(f"Start Time: {run.start_time}")
            print(f"End Time: {run.end_time}")
            if run.end_time and run.start_time:
                duration = run.end_time - run.start_time
                print(f"Duration: {duration}")Daniel Chan
07/01/2025, 7:09 PMMarvin
07/01/2025, 7:09 PMMarvin
07/01/2025, 7:09 PMpython
from prefect.client import get_client
async def show_actual_times(flow_run_id):
    async with get_client() as client:
        run = await client.read_flow_run(flow_run_id)
        print(f"Start Time: {run.start_time}")
        print(f"End Time: {run.end_time}")  # This exists in the API even though UI shows duration
        print(f"Duration shown in UI: {run.end_time - run.start_time if run.end_time and run.start_time else 'N/A'}")Daniel Chan
07/01/2025, 7:15 PMcurl -s <http://prefect.com/api/flow_runs/5db158c6-cc71-442b-8645-ba8e5b81a69c|prefect.com/api/flow_runs/5db158c6-cc71-442b-8645-ba8e5b81a69c> | jq '{id, name, start_time, end_time}'
{
  "id": "5db158c6-cc71-442b-8645-ba8e5b81a69c",
  "name": "tangerine-anaconda",
  "start_time": "2025-07-01T19:01:46.665632Z",
  "end_time": "2025-07-01T19:03:04.016106Z"
}Marvin
07/01/2025, 7:15 PMMarvin
07/01/2025, 7:16 PMjson
{
  "start_time": "2025-07-01T19:01:46.665632Z",
  "end_time": "2025-07-01T19:03:04.016106Z"
}Flow end time: 19:03:04
   Current time: [your current time]
   OFFSET_MINUTES window: [current time - OFFSET_MINUTES] to [current time]prefect_info_flow_runs{
     flow_name="tangerine-anaconda",
     flow_run_id="5db158c6-cc71-442b-8645-ba8e5b81a69c",
     state="completed"
   }bash
   curl -s <http://localhost:8000/metrics> | grep prefect_info_flow_runsbash
curl -s <http://prefect.com/api/flow_runs/5db158c6-cc71-442b-8645-ba8e5b81a69c/states> | jq '.'Daniel Chan
07/01/2025, 7:25 PMcurl -s <http://prefect.com/api/flow_runs/5db158c6-cc71-442b-8645-ba8e5b81a69c> | jq '.'      
{
  "id": "5db158c6-cc71-442b-8645-ba8e5b81a69c",
  "created": "2025-07-01T19:01:21.432118Z",
  "updated": "2025-07-01T19:03:04.022184Z",
  "name": "tangerine-anaconda",
  "flow_id": "14f42903-d3e9-4ffb-bab0-0d14d5af08bb",
  "state_id": "d65d295c-ee16-43ca-bc05-a9c8859e0c68",
  "deployment_id": "25a4caac-2326-4c53-9037-4ce3fbd67427",
  "deployment_version": "d77d3af200a17e4a45e96437d10aeb13b08431a7",
  "work_queue_id": "ad3510a3-48e9-4880-949a-a2f15e11cb27",
  "work_queue_name": "sideflows",
  "flow_version": "44982b98c486777348b5c531ef5be081",
  "idempotency_key": "ee0354bc-104c-4b07-9a49-b244d7a962cc-ZIP_PENDING",
  "context": {},
  "empirical_policy": {
    "max_retries": 0,
    "retry_delay_seconds": 0.0,
    "retries": 0,
    "retry_delay": 0,
    "pause_keys": [],
    "resuming": false,
    "retry_type": null
  },
  "parent_task_run_id": null,
  "state_type": "COMPLETED",
  "state_name": "Completed",
  "run_count": 1,
  "expected_start_time": "2025-07-01T19:01:21.336739Z",
  "next_scheduled_start_time": null,
  "start_time": "2025-07-01T19:01:46.665632Z",
  "end_time": "2025-07-01T19:03:04.016106Z",
  "total_run_time": 77.350474,
  "estimated_run_time": 77.350474,
  "estimated_start_time_delta": 25.328893,
  "auto_scheduled": false,
  "infrastructure_document_id": null,
  "infrastructure_pid": "3fc64242-17a3-4a4f-8c66-5ff7839807c6:prefect-jobs:tangerine-anaconda-kq2mh",
  "created_by": null,
  "work_pool_id": "72f1d141-0944-49fa-b651-b2c95b4585fe",
  "work_pool_name": "queue-based-k8s-pool",
  "state": {
    "id": "d65d295c-ee16-43ca-bc05-a9c8859e0c68",
    "type": "COMPLETED",
    "name": "Completed",
    "timestamp": "2025-07-01T19:03:04.016106Z",
    "message": null,
    "data": null,
    "state_details": {
      "flow_run_id": "5db158c6-cc71-442b-8645-ba8e5b81a69c",
      "task_run_id": null,
      "child_flow_run_id": null,
      "scheduled_time": null,
      "cache_key": null,
      "cache_expiration": null,
      "deferred": null,
      "untrackable_result": true,
      "pause_timeout": null,
      "pause_reschedule": false,
      "pause_key": null,
      "run_input_keyset": null,
      "refresh_cache": null,
      "retriable": null,
      "transition_id": "88f3a170-4d10-43c3-987c-cabdbaa9d965",
      "task_parameters_id": null,
      "traceparent": null
    }
  }
}Marvin
07/01/2025, 7:25 PMMarvin
07/01/2025, 7:25 PMjson
{
  "state_type": "COMPLETED",
  "start_time": "2025-07-01T19:01:46.665632Z",
  "end_time": "2025-07-01T19:03:04.016106Z",
  "state": {
    "type": "COMPLETED",
    "timestamp": "2025-07-01T19:03:04.016106Z",
    "state_details": {
      "untrackable_result": true,
      "transition_id": "88f3a170-4d10-43c3-987c-cabdbaa9d965"
    }
  }
}end_timetimestamptransition_id"untrackable_result": truebash
   # Get the current metrics
   curl -s <http://localhost:8000/metrics> | grep "prefect_info_flow_runs.*5db158c6-cc71-442b-8645-ba8e5b81a69c"bash
   # Show exporter environment variables
   env | grep "OFFSET_MINUTES"bash
   curl -s <http://localhost:8000/metrics> | grep "tangerine-anaconda"untrackable_result: trueuntrackable_result: trueuntrackable_result: trueDaniel Chan
07/02/2025, 3:30 PMMarvin
07/02/2025, 3:30 PMMarvin
07/02/2025, 3:31 PM