https://prefect.io logo
Title
p

Priyank

07/18/2022, 1:03 PM
Hi there! I am running a local prefect server and now wants to collect some useful data from the database, like start_time, end_time, duration, for each and every flow and their tasks. We can clearly see this data on prefect dashboard but how I can query this data from the db. I tried listing all the tables in database but didn't get how they are linked to each other. Can you share a visual representation of how the tables in the database are linked to each other? and some queries too cause duration and other things are already on the dashboard so prefect must be querying this data somehow? or is there an easy way to get this info. And also can we get how many times a task failed? At the end we want to use this data for our analysis, how much time each task is taking and how it is changing, is there a particular time when our task fails and more? We are using prefect 1.0 for now.
s

Sylvain Hazard

07/18/2022, 1:10 PM
Hi ! 👋 I believe using the GraphQL API would be more suitable for your use case. For example I've got a task that queries the API for old flow runs and deletes them in order to keep the backend database relatively small :
@task(name="Delete old flow runs")
def delete_old_runs(threshold_in_days: int):
    if threshold_in_days <= 0:
        raise ValueError(
            "You should set a value for `threshold_in_days` strictly above zero."
        )

    target_datetime = datetime.now() - timedelta(days=threshold_in_days)

    client = prefect.Client()

    flow_runs = client.graphql(
        {
            "query": {
                f'flow_run (where: {{start_time: {{_lt: "{target_datetime}"}}}})': [
                    "id",
                    "start_time",
                    {"flow": {"name"}},
                ]
            }
        }
    )

    prefect.context.get("logger").info(
        f"Deleting {len(flow_runs['data']['flow_run'])} flow runs."
    )

    for flow_run in flow_runs["data"]["flow_run"]:
        prefect.context.get("logger").info(
            client.graphql(
                {
                    "mutation": {
                        f"delete_flow_run(input: {{flow_run_id: \"{flow_run.get('id')}\"}})": {
                            "success",
                            "error",
                        }
                    }
                }
            )
        )
🙌 2
:thank-you: 2
💯 1
p

Priyank

07/19/2022, 1:21 PM
I am new to graphQL, how can I query for duration of a task or flow from GraphQL API? I am able to get start_time and end_time for the flows and tasks.
s

Sylvain Hazard

07/19/2022, 1:34 PM
I'm not sure you can do it directly in GraphQL but you can manipulate results in Python however you like e.g. :
client = prefect.Client()

flow_runs = client.graphql(
    YOUR_QUERY
)

for flow_run in flow_runs["data"]["flow_run"]:
    start_time = flow_run.get("start_time")
    end_time = flow_run.get("end_time")
    duration = end_time - start_time

    if duration > DURATION_THRESHOLD:
         # do stuff
You'd probably have to account for flows that are still running because they won't have an end_time but it should be doable.
👀 1
k

Kevin Kho

07/19/2022, 2:30 PM
The duration is calculated by the UI on the fly so you really need start time and end time like Sylvain suggested
🆗 1