https://prefect.io logo
#show-us-what-you-got
Title
# show-us-what-you-got
j

Joshua Greenhalgh

07/11/2022, 5:28 PM
Dunno if useful for anyone but I wanted a quick view of the states of my flows so put together this CLI script
πŸ‘€ 4
Copy code
import click
import datetime
import os
import json

from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport

from rich.table import Table
from rich.console import Console
from rich import box


GQL_TEMPLATE = """
query Query($orderBy: [flow_run_order_by!], $where: flow_run_bool_exp) {
  flow_run(order_by: $orderBy, where: $where) {
    id
    flow {
      name
    }
    parameters
    start_time
    end_time
    name
    state
  }
}
"""


def gql_client():
    transport = RequestsHTTPTransport(
        url="<https://api.prefect.io>",
        verify=True,
        retries=3,
        headers={"Authorization": f"Bearer {os.environ['PREFECT_API_KEY']}"},
    )

    client = Client(transport=transport, fetch_schema_from_transport=True)

    return client


def fetch_flow_runs(
    project: str,
    start_ts: datetime.datetime,
    end_ts: datetime.datetime,
    state: str,
    flow: str,
    client,
):

    query_params = {
        "orderBy": [{"start_time": "desc_nulls_last"}],
        "where": {
            "start_time": {"_gte": str(start_ts), "_lte": str(end_ts)},
            "flow": {"project": {"name": {"_eq": project}}},
        },
    }

    if state:
        query_params["where"]["state"] = {"_eq": state}

    if flow:
        query_params["where"]["flow"]["name"] = {"_eq": flow}

    result = client.execute(gql(GQL_TEMPLATE), variable_values=query_params)
    return result


def base_table():

    table = Table(show_header=True, expand=True, box=None)
    table.add_column()

    return table


def parse_result(result):

    data = result["flow_run"]
    data = [
        {
            "Flow Name": d["flow"]["name"],
            "State": f"[{'red' if d['state'] == 'Failed' else 'green'}]{d['state']}",
            "Start": d["start_time"],
            "End": d["end_time"],
            "Parameters": json.dumps(d["parameters"], indent=2),
            "Url": f"<https://cloud.prefect.io/><YOUR_ORG_GOES_HERE>/flow-run/{d['id']}",
        }
        for d in data
    ]

    return data


def create_row_table(row_data):

    t = Table(box=box.HORIZONTALS)
    t.add_column("Key")
    t.add_column("Value")

    for k, v in row_data.items():
        t.add_row(k, v)

    return t


def create_table(data):

    table = base_table()
    for d in data:
        row_table = create_row_table(d)
        table.add_row(row_table)

    return table


@click.command()
@click.option(
    "--project",
    "-p",
    default="production",
    help="Prefect project to list flow runs for",
)
@click.option(
    "--start",
    "-s",
    help="Start timestamp/date for list",
    type=click.DateTime(),
    default=datetime.datetime.now() - datetime.timedelta(days=2),
)
@click.option(
    "--end",
    "-e",
    help="End timestamp/date for list",
    type=click.DateTime(),
    default=datetime.datetime.now(),
)
@click.option(
    "--state",
    default=None,
    help="filter to particular flow run state - defaults to no filter",
)
@click.option(
    "--flow",
    "-f",
    default=None,
    help="filter to particular flow - defaults to no filter",
)
def prefect_flow_run_list(project, start, end, state, flow):

    prefect_gql_client = gql_client()
    result = fetch_flow_runs(project, start, end, state, flow, prefect_gql_client)
    data = parse_result(result)
    table = create_table(data)

    console = Console()

    with console.pager(styles=True):
        console.print(table)


if __name__ == "__main__":
    prefect_flow_run_list()
Usage;
Copy code
>> PREFECT_API_KEY=**** python main.py --state Failed

─────────────────────────────────────────────────────────────────────────────────────────────                                
   Key          Value                                                                                                          
  ─────────────────────────────────────────────────────────────────────────────────────────────                                
   Flow Name    some flow                                                                                   
   State        Failed                                                                                                         
   Start        2022-07-11T00:10:50.550494+00:00                                                                               
   End          2022-07-11T00:12:24.953414+00:00                                                                               
   Parameters   {                                                                                                              
                  "arg_1": 42                                                    
                }                                                                                                              
   Url          <https://cloud.prefect.io/some-org/flow-run/some_id>                                 
  ─────────────────────────────────────────────────────────────────────────────────────────────
cool llama 1
k

Kevin Kho

07/11/2022, 5:32 PM
Wow!
j

Joshua Greenhalgh

07/11/2022, 5:32 PM
Haha thanks @Kevin Kho hardly but its been quite useful for me!
Would be a nice thing to incorporate something similar in core I think!
k

Kevin Kho

07/11/2022, 5:34 PM
I think we’d take a PR without the rich dependency for 1.0 (since it doesn’t depend on rich)
I highly suggest you add it to Discourse too so that it will live beyond Slack retention
upvote 3
πŸ‘ 2
j

Joshua Greenhalgh

07/11/2022, 5:35 PM
Without rich it would be a complete PIA to do the formating and paging though 😞
k

Kevin Kho

07/11/2022, 5:35 PM
Ah I see
j

Joshua Greenhalgh

07/11/2022, 5:35 PM
Its a great library for nice CLI stuff
k

Kevin Kho

07/11/2022, 5:36 PM
Yes 2.0 uses rich
upvote 1
a

Anna Geller

07/11/2022, 7:13 PM
@Joshua Greenhalgh thanks so much for this contribution!
4 Views