https://prefect.io logo
Title
r

Raúl Mansilla

10/19/2021, 9:58 AM
Hello team!! I´m trying to write a flow monitor in order to shutdown agents and stop scheduled flows if I get a lot of late flows…how can I query for late flow runs state using graphql api? Thanks!
a

Anna Geller

10/19/2021, 11:38 AM
@Raúl Mansilla I could help you write the query, but wanted to ask first: Do you see late runs in the Server UI? I know that clearing late runs in cloud is as simple as one click in the UI
r

Raúl Mansilla

10/19/2021, 11:40 AM
Hello @Anna Geller Yes, I know we can clear them in the UI but I want to query for late runs and then make some actions (stop agents, unschedule flows, etc…) so I can´t collapse the system
I have this one for failed…
query {
            flow_run(
                where: { state: { _eq: "Failed" },  }
                order_by: { state_timestamp: desc,  }
                limit: 10
            ) {
                start_time
                #end_time
                name
                state
                state_message
                agent {
                name
                #type
                }
                #parameters
            }
            }
a

Anna Geller

10/19/2021, 11:40 AM
Sure, I understand. I believe this query could show you late runs - I assume late run is one where state is Scheduled, but it has no start time
{
  flow_run(where: {state: {_eq: "Scheduled"}, 
    start_time: {_is_null: true}}) {
    id
    name
    flow {
      name
      id
    }
    agent {
      type
      name
    }
    start_time
    end_time
  }
}
r

Raúl Mansilla

10/19/2021, 11:41 AM
mmm, could be!
a

Anna Geller

10/19/2021, 11:42 AM
To unschedule a flow from GraphQL, I think you could use this mutation:
from prefect.client.client import Client

client = Client()
    
flow_id = {"flow_id": "uuid-of-your-flow"}
response = client.graphql(
    """
    mutation($flow_id: UUID!) {
        set_schedule_inactive(input: {flow_id: $flow_id}) {
            success
            error
        }
    }""",
    variables=flow_id,
    raise_on_error=True,
)

print(response)
r

Raúl Mansilla

10/19/2021, 11:44 AM
that´s very interesting, and If I want to uneschedule all the scheduled flows? like switching off the schedule button of the ui?
not just one
a

Anna Geller

10/19/2021, 11:49 AM
@Raúl Mansilla I believe this is something that is possible on Cloud Enterprise plan, but not on Server. Let me confirm with the team.
r

Raúl Mansilla

10/19/2021, 11:49 AM
ohhh, ok, I´ll wait 🙂 thanks a lot
I´ve tested the query for late runs but it shows the scheduled jobs
a

Anna Geller

10/19/2021, 12:27 PM
sorry @Raúl Mansilla my bad, you need to additionally check if scheduled_start_time is in the past - you would need to adjust the timestamp:
{
  flow_run(where: {state: {_eq: "Scheduled"}, 
    start_time: {_is_null: true},
  scheduled_start_time: {_lt: "2021-10-19T12:27:45"}}) {
    id
    name
    scheduled_start_time
    auto_scheduled
    flow {
      name
      id
    }
    agent {
      type
      name
    }
    start_time
    end_time
  }
}
r

Raúl Mansilla

10/19/2021, 1:14 PM
yeeep, no worries, that can work indeed. Thanks for your help
👍 1
a

Anna Geller

10/19/2021, 1:44 PM
@Raúl Mansilla I promised to get back to you to confirm the feature to unschedule all flows, and this seems to be available on all Prefect Cloud plans. This is how it would look like - you would go to “Options” on the main dashboard and click one giant button 🙂 this would deactivate all scheduled flows:
r

Raúl Mansilla

10/19/2021, 3:07 PM
haha…ok, it´s clear 🙂 we´re using the community version. Thanks a lot for the help
Hello again @Anna Geller, is it possible to query for agents as well right? status, and if they´re passing jobs?
k

Kevin Kho

10/19/2021, 8:01 PM
Hey @Raúl Mansilla, maybe you can do a query like this ? or this ? You can also look at the Agents tab in the UI which calls these queries
:upvote: 1
a

Anna Geller

10/19/2021, 8:24 PM
@Raúl Mansilla as Kevin mentioned, I think the field
last_queried
from
agent
will be most helpful in your use case
r

Raúl Mansilla

10/19/2021, 9:20 PM
oh great! Let´s try it out! Thanks guys, you´re awesome!
I didn´t see this page before…well I did´nt have to look for it in the past 🙂
@Anna Geller with the query you provided I can only get scheduled jobs but it should show nothing as there are not Late Runs…I´m a little bit stuck here..
{
  flow_run(where: {state: {_eq: "Scheduled"}, 
    start_time: {_is_null: true},
  scheduled_start_time: {_lt: "2021-10-20T22:35:45"}}) {
    id
    name
    state
    state_message
    scheduled_start_time
    auto_scheduled
    flow {
      name
      id
    }
    agent {
      type
      name
    }
    start_time
    end_time
  }
}
{
  "flow_run": [
    {
      "id": "aacb0885-fcc2-4c48-bc41-4268ff7a0cac",
      "name": "uptight-tody",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:25:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "Ext Adv Ingest Trigger",
        "id": "0d89687e-8670-4538-a3ef-690715fce719",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "36147eda-7f51-45ec-bf00-54823aed4ba0",
      "name": "fierce-turkey",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:44:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Priority Status Update Trigger",
        "id": "aaf63191-b68c-4051-ac2b-a65b0df58ee6",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "6789c8d8-a93c-4072-8da3-61fd9a22b939",
      "name": "positive-seal",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:44:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "Ext Adv Update Trigger",
        "id": "a449c748-1fc8-438d-9f2a-5d090013a6b1",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "f3b8064b-f896-4817-a6ee-3bc6e7de6175",
      "name": "spiked-poodle",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:03:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Status Update Trigger",
        "id": "10244167-da97-4b2e-be7a-80c4b7c1896d",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "c9f57e32-827e-4836-8e23-cb14411c6bbc",
      "name": "beige-ibex",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:00:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Status Update Trigger",
        "id": "10244167-da97-4b2e-be7a-80c4b7c1896d",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "0ea9a312-8f93-4456-a59b-ce2bf0536b41",
      "name": "rare-jackdaw",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T20:57:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Status Update Trigger",
        "id": "10244167-da97-4b2e-be7a-80c4b7c1896d",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "afd16122-9c4b-4217-836c-669b529f856d",
      "name": "manipulative-swine",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:20:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "Ext Adv Ingest Trigger",
        "id": "0d89687e-8670-4538-a3ef-690715fce719",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "129986a5-3925-40e0-b6bc-bce5a4280f93",
      "name": "stylish-boa",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:37:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Priority Status Update Trigger",
        "id": "aaf63191-b68c-4051-ac2b-a65b0df58ee6",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "4ad5c1e9-ca75-499f-8fec-d2413cf99336",
      "name": "kind-albatross",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:37:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "Ext Adv Update Trigger",
        "id": "a449c748-1fc8-438d-9f2a-5d090013a6b1",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "8b6d0f8b-b8f6-4883-951f-daa28a30e313",
      "name": "amiable-dolphin",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T20:51:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Status Update Trigger",
        "id": "10244167-da97-4b2e-be7a-80c4b7c1896d",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    }
  ]
}
k

Kevin Kho

10/20/2021, 8:44 PM
Just clarifying, you want flow runs that have not yet run, but are already late right? So
start_time
is null and
scheduled_start_time
is before the current time?
r

Raúl Mansilla

10/20/2021, 8:44 PM
Yes, more or less the same that will show in the dashboard
k

Kevin Kho

10/20/2021, 8:45 PM
Ah ok will just in the repo for that query then
r

Raúl Mansilla

10/20/2021, 8:47 PM
because normally when we have late runs, a dask cluster or an agent is in problems, so we want to get the flows that appear in late runs and get from which agent come to stop the agent or reboot the dask scheduler/workers in order to prevent the system collapse
it´s also easy to get the count of upcoming, LateRuns and Running flows¿
I want also put some things into grafana and create alerts as well
k

Kevin Kho

10/20/2021, 8:49 PM
So the UI just seems to get all upcoming runs and then take the late ones from it with javascript
r

Raúl Mansilla

10/20/2021, 8:55 PM
ohh ok
k

Kevin Kho

10/20/2021, 8:58 PM
Yeah…so I would move the logic to Python I think to filter those
r

Raúl Mansilla

10/20/2021, 9:10 PM
yep
Thanks a lot @Kevin Kho!
👍 1
k

Kevin Kho

10/20/2021, 9:15 PM
Of course!
r

Raúl Mansilla

10/21/2021, 9:40 AM
Hi again @Kevin Kho and @Anna Geller! and do you happen to know which decision make this JS to differentiate Scheduled Flows in the past with no start time from Late Runs? I guess it´s not only no start time
a

Anna Geller

10/21/2021, 9:50 AM
@Raúl Mansilla I couldn’t find an official definition of a late run in the docs, but I would say that a FlowRun which has been scheduled to run, but for some reason did not start (start_time=null) is a late run. This means: •
state
is Scheduled •
scheduled_start_time
is in the past •
start_time
is null There could be some nuance related to agents as well, e.g. if the agent picked it up but for some reason couldn’t start, with this definition it would still be a late run, even though the agent actually picked up the scheduled FlowRun.
r

Raúl Mansilla

10/21/2021, 10:54 AM
🙂 that does make sense. Thank @Anna Geller
👍 1
Hello again @Anna Geller and @Kevin Kho …I´m facing a courious problem, I´m trying to query flow runs that were Success during the las hour…I get no results, but I can get results from 2 hours onwards…but there is a mismatch between, for example, success flows in the last 24h from the UI (9576) and from the query (9031) of about 500 flows
query = gql(
            """
            query FlowRuns($projectId: uuid, $heartbeat: timestamptz, $start: timestamptz, $end: timestamptz) {
            SuccessTotal: flow_run_aggregate(
                where: {
                flow: { project_id: { _eq: $projectId } }
                scheduled_start_time: { _gte: $heartbeat }
                state: { _eq: "Success" }
                }
            ) {
                aggregate {
                count
                }
            }
            Success1h: flow_run_aggregate(
                where: {
                flow: { project_id: { _eq: $projectId } }
                _and: [
                {scheduled_start_time: { _gt: $start }},
                {end_time: { _lt: $end }},
                ]
                state: { _eq: "Success" }
                }
            ) {
                aggregate {
                count
                }
            }
a

Anna Geller

11/02/2021, 5:07 PM
thx @Raúl Mansilla, will check that. my first impression is that it may be time-zone or date formatting issue, but will check
k

Kevin Kho

11/02/2021, 5:09 PM
That is weird since the UI just uses the query as well. Anna’s suggestions seem good. What is the
gte $heartbeat
?
r

Raúl Mansilla

11/02/2021, 5:09 PM
@Anna Geller OMG….prefect UI says that we are on the same time but machine time says 1h less than actual time….
I mean, my actual time is 18:09, prefect UI says the same but machine time is 17:09
that´s why maybe there is nothing from 18:09 because its 17:09 on machine…
a

Anna Geller

11/02/2021, 5:11 PM
you can change it in the UI settings
r

Raúl Mansilla

11/02/2021, 5:14 PM
Thanks both, silly me!
a

Anna Geller

11/02/2021, 5:15 PM
no worries! and if you get that working, feel free to share your query - I’m sure it will be super appreciated by others from the community