https://prefect.io logo
Title
j

Joshua Greenhalgh

07/11/2022, 5:28 PM
Dunno if useful for anyone but I wanted a quick view of the states of my flows so put together this CLI script
πŸ‘€ 4
import click
import datetime
import os
import json

from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport

from rich.table import Table
from rich.console import Console
from rich import box


GQL_TEMPLATE = """
query Query($orderBy: [flow_run_order_by!], $where: flow_run_bool_exp) {
  flow_run(order_by: $orderBy, where: $where) {
    id
    flow {
      name
    }
    parameters
    start_time
    end_time
    name
    state
  }
}
"""


def gql_client():
    transport = RequestsHTTPTransport(
        url="<https://api.prefect.io>",
        verify=True,
        retries=3,
        headers={"Authorization": f"Bearer {os.environ['PREFECT_API_KEY']}"},
    )

    client = Client(transport=transport, fetch_schema_from_transport=True)

    return client


def fetch_flow_runs(
    project: str,
    start_ts: datetime.datetime,
    end_ts: datetime.datetime,
    state: str,
    flow: str,
    client,
):

    query_params = {
        "orderBy": [{"start_time": "desc_nulls_last"}],
        "where": {
            "start_time": {"_gte": str(start_ts), "_lte": str(end_ts)},
            "flow": {"project": {"name": {"_eq": project}}},
        },
    }

    if state:
        query_params["where"]["state"] = {"_eq": state}

    if flow:
        query_params["where"]["flow"]["name"] = {"_eq": flow}

    result = client.execute(gql(GQL_TEMPLATE), variable_values=query_params)
    return result


def base_table():

    table = Table(show_header=True, expand=True, box=None)
    table.add_column()

    return table


def parse_result(result):

    data = result["flow_run"]
    data = [
        {
            "Flow Name": d["flow"]["name"],
            "State": f"[{'red' if d['state'] == 'Failed' else 'green'}]{d['state']}",
            "Start": d["start_time"],
            "End": d["end_time"],
            "Parameters": json.dumps(d["parameters"], indent=2),
            "Url": f"<https://cloud.prefect.io/><YOUR_ORG_GOES_HERE>/flow-run/{d['id']}",
        }
        for d in data
    ]

    return data


def create_row_table(row_data):

    t = Table(box=box.HORIZONTALS)
    t.add_column("Key")
    t.add_column("Value")

    for k, v in row_data.items():
        t.add_row(k, v)

    return t


def create_table(data):

    table = base_table()
    for d in data:
        row_table = create_row_table(d)
        table.add_row(row_table)

    return table


@click.command()
@click.option(
    "--project",
    "-p",
    default="production",
    help="Prefect project to list flow runs for",
)
@click.option(
    "--start",
    "-s",
    help="Start timestamp/date for list",
    type=click.DateTime(),
    default=datetime.datetime.now() - datetime.timedelta(days=2),
)
@click.option(
    "--end",
    "-e",
    help="End timestamp/date for list",
    type=click.DateTime(),
    default=datetime.datetime.now(),
)
@click.option(
    "--state",
    default=None,
    help="filter to particular flow run state - defaults to no filter",
)
@click.option(
    "--flow",
    "-f",
    default=None,
    help="filter to particular flow - defaults to no filter",
)
def prefect_flow_run_list(project, start, end, state, flow):

    prefect_gql_client = gql_client()
    result = fetch_flow_runs(project, start, end, state, flow, prefect_gql_client)
    data = parse_result(result)
    table = create_table(data)

    console = Console()

    with console.pager(styles=True):
        console.print(table)


if __name__ == "__main__":
    prefect_flow_run_list()
Usage;
>> PREFECT_API_KEY=**** python main.py --state Failed

─────────────────────────────────────────────────────────────────────────────────────────────                                
   Key          Value                                                                                                          
  ─────────────────────────────────────────────────────────────────────────────────────────────                                
   Flow Name    some flow                                                                                   
   State        Failed                                                                                                         
   Start        2022-07-11T00:10:50.550494+00:00                                                                               
   End          2022-07-11T00:12:24.953414+00:00                                                                               
   Parameters   {                                                                                                              
                  "arg_1": 42                                                    
                }                                                                                                              
   Url          <https://cloud.prefect.io/some-org/flow-run/some_id>                                 
  ─────────────────────────────────────────────────────────────────────────────────────────────
:cool-llama: 1
k

Kevin Kho

07/11/2022, 5:32 PM
Wow!
j

Joshua Greenhalgh

07/11/2022, 5:32 PM
Haha thanks @Kevin Kho hardly but its been quite useful for me!
Would be a nice thing to incorporate something similar in core I think!
k

Kevin Kho

07/11/2022, 5:34 PM
I think we’d take a PR without the rich dependency for 1.0 (since it doesn’t depend on rich)
I highly suggest you add it to Discourse too so that it will live beyond Slack retention
:upvote: 3
πŸ‘ 2
j

Joshua Greenhalgh

07/11/2022, 5:35 PM
Without rich it would be a complete PIA to do the formating and paging though 😞
k

Kevin Kho

07/11/2022, 5:35 PM
Ah I see
j

Joshua Greenhalgh

07/11/2022, 5:35 PM
Its a great library for nice CLI stuff
k

Kevin Kho

07/11/2022, 5:36 PM
Yes 2.0 uses rich
:upvote: 1
a

Anna Geller

07/11/2022, 7:13 PM
@Joshua Greenhalgh thanks so much for this contribution!