Coming from Airflow, I've become reliant on the de...
# prefect-community
t
Coming from Airflow, I've become reliant on the detailed statistics and analytics for my jobs. Is this on the roadmap for Prefect?
j
Hi @tiz.io! Welcome to Prefect. Can you give a bit more information about what extra statistics and analytics you'd like to see?
As a bit of extra information, one of our goals for next week is to enable a display of the full flow history (across all versions) and that will likely lead to similar screens for tasks (although that is highly non-trivial!) Right now you can see the durations of all runs of a fixed task in the UI (in the Duration column).
j
@tiz.io we use Prefect's GraphQL API to get stats on Flow runs. Here's some simple throw-away code I used this week to look at average startup (delta of scheduled & start times) latency & duration for flow runs. (I think with fancier GraphQL you could even do this fully in GraphQL.)
Copy code
from datetime import datetime, timedelta
from dateutil import parser

import prefect

c = prefect.Client()
r = c.graphql(
    """
query {
  flow_run(where: {
    _and: {
      flow_id: {
        _eq: "<your flow id>"
      }
    }
  }, order_by: {
    scheduled_start_time: desc
  }) {
    id
    duration
    scheduled_start_time,
    start_time,
    state
  }
}
"""
)

flow_runs = r["data"]["flow_run"]

total_d = 0
total_st = 0
st_iterations = 0
states = {}
for fr in flow_runs:
    states[fr["state"]] = states.get(fr["state"], 0) + 1
    if fr["state"] != "Success":
        continue
    dstr = fr["duration"]
    d = 0
    if dstr:
        t = datetime.strptime(dstr, "%H:%M:%S.%f")
        delta = timedelta(hours=t.hour, minutes=t.minute, seconds=t.second)
        d = delta.total_seconds()
    total_d += d
    sst = fr["scheduled_start_time"]
    st = fr["start_time"]
    if sst and st:
        sstd = parser.isoparse(sst)
        std = parser.isoparse(st)
        delta = std - sstd
        total_st += delta.total_seconds()
        st_iterations += 1

print("Flow runs: {}".format(len(flow_runs)))
print("Valid runs: {}".format(st_iterations))

avg_duration = total_d / st_iterations
print("Average duration: {} seconds".format(round(avg_duration, 2)))

avg_startup_latency = total_st / st_iterations
print("Average startup latency: {} minutes".format(round(avg_startup_latency / 60, 2)))

print("State counts: {}".format(states))
You could do the same basic type of approach to query task durations, etc. The Prefect GraphQL API is really powerful.
🚀 2
👏 2
j
Thanks @Joe Schmid!
👍 1
t
Thanks for the info! Specifically, we were relying on the charts of run time across jobs to infer at a glance some important data about our ETL job.
j
Making sure I understand this correctly - you'd want to be able to see and compare the run-times of different flows?
t
Yes, something like what we were using with AF:
j
A ha! Thanks for the extra information @tiz.io. We're considering the different statistics we can include in the UI and we'll take that one into consideration.