Hey all! Was wondering if someone could help me ou...
# ask-community
s
Hey all! Was wondering if someone could help me out in configuring a prefect agent to make it a bit more robust. After we had an outage this evening, a massive backlog of flows occurred. This would be fine, except that the agent we have (a local agent run as a service in the docker compose) immedietly tries to launch all the delayed jobs, which makes it run out of memory. We get the Lazarus errors, the agent restarts, and same thing happens. You can see in the screenshot of service memory the constant agents restarting, using all available memory, and eventually crashing. Are there any settings/env variables I can set to try and a) limit the number of concurrent jobs the agent sends out and b) stop whatever is using memory? What exactly would cause an almost instant multiple GB memory usage in the agent?
k
Hey, this is one of the reasons Automations was introduced so you can cancel flows that have not kicked off after X number of minutes. For flow level concurrency, you can check these docs. These are both Cloud only features
s
Hey Kevin! We are not using prefect cloud, and one of the big product points when we set it up a while ago were the statements that server and cloud would share the same functionality. Are there no solutions for this for prefect server users?
Ideally, if we have a flow that is scheduled every 5 minutes, I would want to say “Look, I dont care if youve missed the last 20 of them, dont try and run all 20 of then, just run the last one”
k
Kind of but a bit more complicated. You could try a flow level state handler to hit the GraphQL API to check how many of that flow is running, and then you could end the flow as success
s
I am open to any and all solutions as this just took down my prod env data source and I need to ensure it cant happen again if at all possible. Is there any chance youd be able to provide a scaffolding or a few pointers so that I can begin looking at this in the morning?
Also, would you have any updates on https://github.com/PrefectHQ/server/issues/213, I notice the PR was closed due to difficult, but maybe theres someone else giving it a shot?
I raised this quite a while ago in a similar issue, but would be great to see if its still on the roadmap?
k
I think Michael detailed it was hard to make performant here
I don’t think there’s much left on the roadmap because as we move to Orion, the roadmap of the current Prefect core is more about stability and performance
s
Thats fair enough, I do look forward to watching the update videos once theyre out and about. Regarding a flow handler with graphql mixed in, are there any examples in the current doco I can use whilst I pray for a speed delivery of orion?
Also, will orion be offered as server and cloud like prefect? Data security contracts and networking issues prevent us from using a cloud based solution
k
Not quite but I can detail this out. You can use a flow level state handler that uses the
Client().graphql()
to hit the API. The state handler would be applied on the scheduled -> running transition. In this state handler, use the Client to query for the number of flows running.
Copy code
query {
  flow (where: 
    {name: {_eq: "..."},
     project: {name: {_eq: "bristech"}},
     archived: {_eq: false}}) {
    name
    project {
      name
    }
    archived
    flow_runs(where: {state: {_eq: "Running"}}){
      id
      state
    }
  }
}
And then use Python to count the number. If the number of flows is at the concurrency you want for that flow, then just return SUCCESS
Orion is a server+agent in one. Cloud form will take a different form, but with Orion, Server and Cloud features will be very aligned
Because everyone will be on Orion
s
Awesome, Ill see if I can get something working. Ah, to be more explicit, if we have an existing swarm/etc will we be able to integrate orion into our swarm like can with prefect server?
k
I’d like to say less because server will all fit in one container now. It’s not a matter of spinning up a ton of containers as it’s backed by sqllite
s
Ah, sounds promising. Thanks for the info, and Ill let you know how I get on with the flow handler 🙂
k
I have recommended this a couple of times and noone has come back to me about it so I assume it works
🙏 1
s
As a potentially secondary solution, is there also a way to signify the maximum time Delta for rescheduling. Where I could say "if you're more than an hour late, just fail and get out of the way"?
k
That...is the Automation in Prefect Cloud 😅. But it requires services running in our backend that oversee all of the flows not available in Server
a
btw @Samuel Hinton you mentioned:
I would want to say “Look, I dont care if youve missed the last 20 of them, dont try and run all 20 of then, just run the last one”
There is also a UI feature to clear late runs with one click.
s
Yeah thats what I ended up using to get the environment back to runnable conditions, just trying to ensure that I dont have PagerDuty calling me again in the early morning to do it manually haha
💯 1
k
I’m also interested in this. I have a flow that under ideal circumstances will run very quickly, but occasionally might have to “re-snapshot” a source and run long. I believe this would fall into the same category where I would like to setup scheduling to only allow a single concurrent flow and only run a single instance again on the next run if there were missed events.
k
I don’t think the agent concurrency will be revisited anytime soon as it was attempted but not performant. The workaround though is outlined above. Let’s see if Samuel reports back?
👍 1
k
Sounds good. I’ll end up trying the same thing if I get to it first 🙂
s
It's at the top of our backlog for next sprint so hopefully I'll check back in early next week with a full query and python snippet
🙏 1
Hey @Kevin Kho, would you know how to, in a state handler, get the flow run id? Im trying to figure out how exactly to check to see if the current flow run for the handler is the latest one scheduled (so I dont kill it, but can kill everything else). But figuring out the flow run ID (or its randomised name) inside the handler has stumped me, so I’m probably overlooking something simple. This is the code so far:
Copy code
def state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
    if old_state.is_pending() and new_state.is_running():

        client = Client()
        now = dt.now(tz=pytz.UTC)
        result = client.graphql(
            """{
    flow(where: {
        archived: {_eq: false},
        name: {_eq: "%s"}
    }) {
        name
        archived
        flow_runs (where: {
        state: {_in: ["Scheduled", "Retrying", "Running"]}, 
        scheduled_start_time:{_lte: "%s"}
        }) {
        scheduled_start_time
        start_time
        name
        state
        id
        }
    }"""
            % (flow.name, now.isoformat())
        )
        # These flow runs will be everything thats scheduled to start in the past and
        # might have built up.
        flow_runs = result["data"]["flow"]["flow_runs"]

        # I dont want to run another task if:
        # 1. Theres already a flow in the running state
        # 2. If there are multiple scheduled, only the latest one should be run
        any_running = any([f["state"] == "Running" for f in flow_runs])
        if any_running:
            return Cancelled("Existing tasks are already running")
        
        scheduled = [f for f in flow_runs if f["state"] in ("Pending", "Scheduled")]
        if len(scheduled) > 1:
            last_scheduled_time = max([dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled])
            this_flow_run = None 
            # How do I get the flow run id? It doesnt seem to be in the Flow, and not in the State either

    pass
k
Just do
prefect.context.get("flow_run_id")
inside it
s
Ahhh theres a funky little context box, perfect, thanks mate
Ah would
flow.context.get(…)
work too, or is
prefect.context
going to do some scope matching or similar to know whats going on?
k
I don’t think that will work because context is not attached to it. Is it?
s
Youre right, I was confusing it with
flow_run_context
inside Flow itself, but its never persisted as an attribute, apologies
k
No worries!
s
Hmm, trying to manually set a schedule to enable testing of this, but its hitting me with the old “Interval can not be less than one minute when deploying to Prefect Cloud.” I am not deploying to prefect cloud, but to prefect server on my own hardware. Is there a way to turn off this validation that you know of?
k
Will check
Looks hardcoded so I don’t think it’s configurable
s
Ah well, if its client side then I guess I can just edit the code haha
Theres a server side check too, damn. Okay, Ill figure out some other way of testing
@Kevin Mullins @Kevin Kho reporting back with a working state handler.
Copy code
import datetime
from datetime import timedelta, datetime as dt
import json
import os
import gc
from typing import Optional

import pytz
import requests
import pandas as pd
import prefect
from prefect import task, Flow
from prefect.engine.state import Failed
from prefect.utilities.notifications import slack_notifier
from prefect.engine.signals import SKIP
from prefect.engine.state import Cancelled, State
from prefect.client import Client


def concurrent_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
    if old_state.is_pending() and new_state.is_running():

        client = Client()
        now = dt.now(tz=pytz.UTC).replace(microsecond=0) + timedelta(seconds=1)
        # Replacing microseconds because graphql api cant always handle the number of decimals
        result = client.graphql(
            """{
    flow(where: {
        archived: {_eq: false},
        name: {_eq: "%s"}
    }) {
        name
        archived
        flow_runs (where: {
        state: {_in: ["Submitted", "Queued", "Scheduled", "Retrying", "Running"]}, 
        scheduled_start_time:{_lte: "%s"}
        }) {
        scheduled_start_time
        start_time
        name
        state
        id
        }
    }
}"""
            % (flow.name, now.isoformat())  # Sorry for % operator, but those {} make it a pain
        )
        # These flow runs will be everything thats scheduled to start in the past and
        # might have built up.
        logger = prefect.context.get("logger")
        # This might fail if the GraphQL cant find anything, but havent seen this in practise
        flow_runs = result["data"]["flow"][0]["flow_runs"]

        # I dont want to run another task if theres already more than one flow running
        # For me, Im happy to have two running at once, as API issues means we can get timeouts and
        # hangs that dont terminate easily. For other use cases, Id generally say to cancel if theres
        # any running
        num_running = sum([1 if f["state"] in ("Running", "Retrying") else 0 for f in flow_runs])
        if num_running > 1:
            msg = "Existing tasks are already running"
            <http://logger.info|logger.info>(msg)
            return Cancelled(msg)

        # And if there are multiple scheduled, only the latest one should be run
        scheduled = [
            f for f in flow_runs if f["state"] in ("Pending", "Scheduled", "Queued", "Submitted")
        ]
        if len(scheduled) > 1:
            last_scheduled_time = max(
                [dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled]
            )
            this_flow_run_id = prefect.context.get("flow_run_id")
            matching_runs = [f for f in scheduled if f["id"] == this_flow_run_id]
            if not matching_runs:
                <http://logger.info|logger.info>(f"Current id is {this_flow_run_id}")
                <http://logger.info|logger.info>(f"Flow runs are: {scheduled}")
                return Cancelled("Nope")
            this_run = matching_runs[0]
            this_run_time = dt.fromisoformat(this_run["scheduled_start_time"])
            if this_run_time != last_scheduled_time:
                msg = "Multiple scheduled tasks, this is not the last one"
                <http://logger.info|logger.info>(msg)
                return Cancelled(msg)

    return new_state
Will allow a max of two concurrent running jobs (can easily change this to one), and if multiple jobs are scheduled (ie your agent was down for a while and is now back up), only the last one will execute and the others will log and cancel. Not all the imports are necessary, havent filtered out the imports from the other handlers and common functions in the file.
🙏 3
k
Awesome! Thank you @Samuel Hinton very much for this.
k
Nice. This is worth archiving in the server repo. Want me to do that or do you want to post it?
s
Feel free to do it however you’d like @Kevin Kho 🙂
k