Hello team!! I´m trying to write a flow monitor in...
# prefect-server
r
Hello team!! I´m trying to write a flow monitor in order to shutdown agents and stop scheduled flows if I get a lot of late flows…how can I query for late flow runs state using graphql api? Thanks!
a
@Raúl Mansilla I could help you write the query, but wanted to ask first: Do you see late runs in the Server UI? I know that clearing late runs in cloud is as simple as one click in the UI
r
Hello @Anna Geller Yes, I know we can clear them in the UI but I want to query for late runs and then make some actions (stop agents, unschedule flows, etc…) so I can´t collapse the system
I have this one for failed…
Copy code
query {
            flow_run(
                where: { state: { _eq: "Failed" },  }
                order_by: { state_timestamp: desc,  }
                limit: 10
            ) {
                start_time
                #end_time
                name
                state
                state_message
                agent {
                name
                #type
                }
                #parameters
            }
            }
a
Sure, I understand. I believe this query could show you late runs - I assume late run is one where state is Scheduled, but it has no start time
Copy code
{
  flow_run(where: {state: {_eq: "Scheduled"}, 
    start_time: {_is_null: true}}) {
    id
    name
    flow {
      name
      id
    }
    agent {
      type
      name
    }
    start_time
    end_time
  }
}
r
mmm, could be!
a
To unschedule a flow from GraphQL, I think you could use this mutation:
Copy code
from prefect.client.client import Client

client = Client()
    
flow_id = {"flow_id": "uuid-of-your-flow"}
response = client.graphql(
    """
    mutation($flow_id: UUID!) {
        set_schedule_inactive(input: {flow_id: $flow_id}) {
            success
            error
        }
    }""",
    variables=flow_id,
    raise_on_error=True,
)

print(response)
r
that´s very interesting, and If I want to uneschedule all the scheduled flows? like switching off the schedule button of the ui?
not just one
a
@Raúl Mansilla I believe this is something that is possible on Cloud Enterprise plan, but not on Server. Let me confirm with the team.
r
ohhh, ok, I´ll wait 🙂 thanks a lot
I´ve tested the query for late runs but it shows the scheduled jobs
a
sorry @Raúl Mansilla my bad, you need to additionally check if scheduled_start_time is in the past - you would need to adjust the timestamp:
Copy code
{
  flow_run(where: {state: {_eq: "Scheduled"}, 
    start_time: {_is_null: true},
  scheduled_start_time: {_lt: "2021-10-19T12:27:45"}}) {
    id
    name
    scheduled_start_time
    auto_scheduled
    flow {
      name
      id
    }
    agent {
      type
      name
    }
    start_time
    end_time
  }
}
r
yeeep, no worries, that can work indeed. Thanks for your help
👍 1
a
@Raúl Mansilla I promised to get back to you to confirm the feature to unschedule all flows, and this seems to be available on all Prefect Cloud plans. This is how it would look like - you would go to “Options” on the main dashboard and click one giant button 🙂 this would deactivate all scheduled flows:
r
haha…ok, it´s clear 🙂 we´re using the community version. Thanks a lot for the help
Hello again @Anna Geller, is it possible to query for agents as well right? status, and if they´re passing jobs?
k
Hey @Raúl Mansilla, maybe you can do a query like this ? or this ? You can also look at the Agents tab in the UI which calls these queries
upvote 1
a
@Raúl Mansilla as Kevin mentioned, I think the field
last_queried
from
agent
will be most helpful in your use case
r
oh great! Let´s try it out! Thanks guys, you´re awesome!
I didn´t see this page before…well I did´nt have to look for it in the past 🙂
@Anna Geller with the query you provided I can only get scheduled jobs but it should show nothing as there are not Late Runs…I´m a little bit stuck here..
Copy code
{
  flow_run(where: {state: {_eq: "Scheduled"}, 
    start_time: {_is_null: true},
  scheduled_start_time: {_lt: "2021-10-20T22:35:45"}}) {
    id
    name
    state
    state_message
    scheduled_start_time
    auto_scheduled
    flow {
      name
      id
    }
    agent {
      type
      name
    }
    start_time
    end_time
  }
}
Copy code
{
  "flow_run": [
    {
      "id": "aacb0885-fcc2-4c48-bc41-4268ff7a0cac",
      "name": "uptight-tody",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:25:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "Ext Adv Ingest Trigger",
        "id": "0d89687e-8670-4538-a3ef-690715fce719",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "36147eda-7f51-45ec-bf00-54823aed4ba0",
      "name": "fierce-turkey",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:44:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Priority Status Update Trigger",
        "id": "aaf63191-b68c-4051-ac2b-a65b0df58ee6",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "6789c8d8-a93c-4072-8da3-61fd9a22b939",
      "name": "positive-seal",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:44:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "Ext Adv Update Trigger",
        "id": "a449c748-1fc8-438d-9f2a-5d090013a6b1",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "f3b8064b-f896-4817-a6ee-3bc6e7de6175",
      "name": "spiked-poodle",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:03:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Status Update Trigger",
        "id": "10244167-da97-4b2e-be7a-80c4b7c1896d",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "c9f57e32-827e-4836-8e23-cb14411c6bbc",
      "name": "beige-ibex",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:00:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Status Update Trigger",
        "id": "10244167-da97-4b2e-be7a-80c4b7c1896d",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "0ea9a312-8f93-4456-a59b-ce2bf0536b41",
      "name": "rare-jackdaw",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T20:57:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Status Update Trigger",
        "id": "10244167-da97-4b2e-be7a-80c4b7c1896d",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "afd16122-9c4b-4217-836c-669b529f856d",
      "name": "manipulative-swine",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:20:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "Ext Adv Ingest Trigger",
        "id": "0d89687e-8670-4538-a3ef-690715fce719",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "129986a5-3925-40e0-b6bc-bce5a4280f93",
      "name": "stylish-boa",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:37:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Priority Status Update Trigger",
        "id": "aaf63191-b68c-4051-ac2b-a65b0df58ee6",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "4ad5c1e9-ca75-499f-8fec-d2413cf99336",
      "name": "kind-albatross",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T21:37:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "Ext Adv Update Trigger",
        "id": "a449c748-1fc8-438d-9f2a-5d090013a6b1",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    },
    {
      "id": "8b6d0f8b-b8f6-4883-951f-daa28a30e313",
      "name": "amiable-dolphin",
      "state": "Scheduled",
      "state_message": "Flow run scheduled.",
      "scheduled_start_time": "2021-10-20T20:51:00+00:00",
      "auto_scheduled": true,
      "flow": {
        "name": "MediatorX Status Update Trigger",
        "id": "10244167-da97-4b2e-be7a-80c4b7c1896d",
        "__typename": "flow"
      },
      "agent": null,
      "start_time": null,
      "end_time": null,
      "__typename": "flow_run"
    }
  ]
}
k
Just clarifying, you want flow runs that have not yet run, but are already late right? So
start_time
is null and
scheduled_start_time
is before the current time?
r
Yes, more or less the same that will show in the dashboard
k
Ah ok will just in the repo for that query then
r
because normally when we have late runs, a dask cluster or an agent is in problems, so we want to get the flows that appear in late runs and get from which agent come to stop the agent or reboot the dask scheduler/workers in order to prevent the system collapse
it´s also easy to get the count of upcoming, LateRuns and Running flows¿
I want also put some things into grafana and create alerts as well
k
So the UI just seems to get all upcoming runs and then take the late ones from it with javascript
r
ohh ok
k
Yeah…so I would move the logic to Python I think to filter those
r
yep
Thanks a lot @Kevin Kho!
👍 1
k
Of course!
r
Hi again @Kevin Kho and @Anna Geller! and do you happen to know which decision make this JS to differentiate Scheduled Flows in the past with no start time from Late Runs? I guess it´s not only no start time
a
@Raúl Mansilla I couldn’t find an official definition of a late run in the docs, but I would say that a FlowRun which has been scheduled to run, but for some reason did not start (start_time=null) is a late run. This means: •
state
is Scheduled •
scheduled_start_time
is in the past •
start_time
is null There could be some nuance related to agents as well, e.g. if the agent picked it up but for some reason couldn’t start, with this definition it would still be a late run, even though the agent actually picked up the scheduled FlowRun.
r
🙂 that does make sense. Thank @Anna Geller
👍 1
Hello again @Anna Geller and @Kevin Kho …I´m facing a courious problem, I´m trying to query flow runs that were Success during the las hour…I get no results, but I can get results from 2 hours onwards…but there is a mismatch between, for example, success flows in the last 24h from the UI (9576) and from the query (9031) of about 500 flows
Copy code
query = gql(
            """
            query FlowRuns($projectId: uuid, $heartbeat: timestamptz, $start: timestamptz, $end: timestamptz) {
            SuccessTotal: flow_run_aggregate(
                where: {
                flow: { project_id: { _eq: $projectId } }
                scheduled_start_time: { _gte: $heartbeat }
                state: { _eq: "Success" }
                }
            ) {
                aggregate {
                count
                }
            }
            Success1h: flow_run_aggregate(
                where: {
                flow: { project_id: { _eq: $projectId } }
                _and: [
                {scheduled_start_time: { _gt: $start }},
                {end_time: { _lt: $end }},
                ]
                state: { _eq: "Success" }
                }
            ) {
                aggregate {
                count
                }
            }
a
thx @Raúl Mansilla, will check that. my first impression is that it may be time-zone or date formatting issue, but will check
k
That is weird since the UI just uses the query as well. Anna’s suggestions seem good. What is the
gte $heartbeat
?
r
@Anna Geller OMG….prefect UI says that we are on the same time but machine time says 1h less than actual time….
I mean, my actual time is 18:09, prefect UI says the same but machine time is 17:09
that´s why maybe there is nothing from 18:09 because its 17:09 on machine…
a
you can change it in the UI settings
r
Thanks both, silly me!
a
no worries! and if you get that working, feel free to share your query - I’m sure it will be super appreciated by others from the community