Joshua Greenhalgh
07/11/2022, 5:28 PMimport click
import datetime
import os
import json
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
from rich.table import Table
from rich.console import Console
from rich import box
GQL_TEMPLATE = """
query Query($orderBy: [flow_run_order_by!], $where: flow_run_bool_exp) {
flow_run(order_by: $orderBy, where: $where) {
id
flow {
name
}
parameters
start_time
end_time
name
state
}
}
"""
def gql_client():
transport = RequestsHTTPTransport(
url="<https://api.prefect.io>",
verify=True,
retries=3,
headers={"Authorization": f"Bearer {os.environ['PREFECT_API_KEY']}"},
)
client = Client(transport=transport, fetch_schema_from_transport=True)
return client
def fetch_flow_runs(
project: str,
start_ts: datetime.datetime,
end_ts: datetime.datetime,
state: str,
flow: str,
client,
):
query_params = {
"orderBy": [{"start_time": "desc_nulls_last"}],
"where": {
"start_time": {"_gte": str(start_ts), "_lte": str(end_ts)},
"flow": {"project": {"name": {"_eq": project}}},
},
}
if state:
query_params["where"]["state"] = {"_eq": state}
if flow:
query_params["where"]["flow"]["name"] = {"_eq": flow}
result = client.execute(gql(GQL_TEMPLATE), variable_values=query_params)
return result
def base_table():
table = Table(show_header=True, expand=True, box=None)
table.add_column()
return table
def parse_result(result):
data = result["flow_run"]
data = [
{
"Flow Name": d["flow"]["name"],
"State": f"[{'red' if d['state'] == 'Failed' else 'green'}]{d['state']}",
"Start": d["start_time"],
"End": d["end_time"],
"Parameters": json.dumps(d["parameters"], indent=2),
"Url": f"<https://cloud.prefect.io/><YOUR_ORG_GOES_HERE>/flow-run/{d['id']}",
}
for d in data
]
return data
def create_row_table(row_data):
t = Table(box=box.HORIZONTALS)
t.add_column("Key")
t.add_column("Value")
for k, v in row_data.items():
t.add_row(k, v)
return t
def create_table(data):
table = base_table()
for d in data:
row_table = create_row_table(d)
table.add_row(row_table)
return table
@click.command()
@click.option(
"--project",
"-p",
default="production",
help="Prefect project to list flow runs for",
)
@click.option(
"--start",
"-s",
help="Start timestamp/date for list",
type=click.DateTime(),
default=datetime.datetime.now() - datetime.timedelta(days=2),
)
@click.option(
"--end",
"-e",
help="End timestamp/date for list",
type=click.DateTime(),
default=datetime.datetime.now(),
)
@click.option(
"--state",
default=None,
help="filter to particular flow run state - defaults to no filter",
)
@click.option(
"--flow",
"-f",
default=None,
help="filter to particular flow - defaults to no filter",
)
def prefect_flow_run_list(project, start, end, state, flow):
prefect_gql_client = gql_client()
result = fetch_flow_runs(project, start, end, state, flow, prefect_gql_client)
data = parse_result(result)
table = create_table(data)
console = Console()
with console.pager(styles=True):
console.print(table)
if __name__ == "__main__":
prefect_flow_run_list()
>> PREFECT_API_KEY=**** python main.py --state Failed
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Key Value
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Flow Name some flow
State Failed
Start 2022-07-11T00:10:50.550494+00:00
End 2022-07-11T00:12:24.953414+00:00
Parameters {
"arg_1": 42
}
Url <https://cloud.prefect.io/some-org/flow-run/some_id>
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Kevin Kho
07/11/2022, 5:32 PMJoshua Greenhalgh
07/11/2022, 5:32 PMKevin Kho
07/11/2022, 5:34 PMJoshua Greenhalgh
07/11/2022, 5:35 PMKevin Kho
07/11/2022, 5:35 PMJoshua Greenhalgh
07/11/2022, 5:35 PMKevin Kho
07/11/2022, 5:36 PMAnna Geller
07/11/2022, 7:13 PM