Samuel Hinton

    Samuel Hinton

    10 months ago
    Hey all! Was wondering if someone could help me out in configuring a prefect agent to make it a bit more robust. After we had an outage this evening, a massive backlog of flows occurred. This would be fine, except that the agent we have (a local agent run as a service in the docker compose) immedietly tries to launch all the delayed jobs, which makes it run out of memory. We get the Lazarus errors, the agent restarts, and same thing happens. You can see in the screenshot of service memory the constant agents restarting, using all available memory, and eventually crashing. Are there any settings/env variables I can set to try and a) limit the number of concurrent jobs the agent sends out and b) stop whatever is using memory? What exactly would cause an almost instant multiple GB memory usage in the agent?
    Kevin Kho

    Kevin Kho

    10 months ago
    Hey, this is one of the reasons Automations was introduced so you can cancel flows that have not kicked off after X number of minutes. For flow level concurrency, you can check these docs. These are both Cloud only features
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Hey Kevin! We are not using prefect cloud, and one of the big product points when we set it up a while ago were the statements that server and cloud would share the same functionality. Are there no solutions for this for prefect server users?
    Ideally, if we have a flow that is scheduled every 5 minutes, I would want to say “Look, I dont care if youve missed the last 20 of them, dont try and run all 20 of then, just run the last one”
    Kevin Kho

    Kevin Kho

    10 months ago
    Kind of but a bit more complicated. You could try a flow level state handler to hit the GraphQL API to check how many of that flow is running, and then you could end the flow as success
    Samuel Hinton

    Samuel Hinton

    10 months ago
    I am open to any and all solutions as this just took down my prod env data source and I need to ensure it cant happen again if at all possible. Is there any chance youd be able to provide a scaffolding or a few pointers so that I can begin looking at this in the morning?
    Also, would you have any updates on https://github.com/PrefectHQ/server/issues/213, I notice the PR was closed due to difficult, but maybe theres someone else giving it a shot?
    I raised this quite a while ago in a similar issue, but would be great to see if its still on the roadmap?
    Kevin Kho

    Kevin Kho

    10 months ago
    I think Michael detailed it was hard to make performant here
    I don’t think there’s much left on the roadmap because as we move to Orion, the roadmap of the current Prefect core is more about stability and performance
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Thats fair enough, I do look forward to watching the update videos once theyre out and about. Regarding a flow handler with graphql mixed in, are there any examples in the current doco I can use whilst I pray for a speed delivery of orion?
    Also, will orion be offered as server and cloud like prefect? Data security contracts and networking issues prevent us from using a cloud based solution
    Kevin Kho

    Kevin Kho

    10 months ago
    Not quite but I can detail this out. You can use a flow level state handler that uses the
    Client().graphql()
    to hit the API. The state handler would be applied on the scheduled -> running transition. In this state handler, use the Client to query for the number of flows running.
    query {
      flow (where: 
        {name: {_eq: "..."},
         project: {name: {_eq: "bristech"}},
         archived: {_eq: false}}) {
        name
        project {
          name
        }
        archived
        flow_runs(where: {state: {_eq: "Running"}}){
          id
          state
        }
      }
    }
    And then use Python to count the number. If the number of flows is at the concurrency you want for that flow, then just return SUCCESS
    Orion is a server+agent in one. Cloud form will take a different form, but with Orion, Server and Cloud features will be very aligned
    Because everyone will be on Orion
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Awesome, Ill see if I can get something working. Ah, to be more explicit, if we have an existing swarm/etc will we be able to integrate orion into our swarm like can with prefect server?
    Kevin Kho

    Kevin Kho

    10 months ago
    I’d like to say less because server will all fit in one container now. It’s not a matter of spinning up a ton of containers as it’s backed by sqllite
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Ah, sounds promising. Thanks for the info, and Ill let you know how I get on with the flow handler 🙂
    Kevin Kho

    Kevin Kho

    10 months ago
    I have recommended this a couple of times and noone has come back to me about it so I assume it works
    Samuel Hinton

    Samuel Hinton

    10 months ago
    As a potentially secondary solution, is there also a way to signify the maximum time Delta for rescheduling. Where I could say "if you're more than an hour late, just fail and get out of the way"?
    Kevin Kho

    Kevin Kho

    10 months ago
    That...is the Automation in Prefect Cloud 😅. But it requires services running in our backend that oversee all of the flows not available in Server
    Anna Geller

    Anna Geller

    10 months ago
    btw @Samuel Hinton you mentioned:
    I would want to say “Look, I dont care if youve missed the last 20 of them, dont try and run all 20 of then, just run the last one”
    There is also a UI feature to clear late runs with one click.
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Yeah thats what I ended up using to get the environment back to runnable conditions, just trying to ensure that I dont have PagerDuty calling me again in the early morning to do it manually haha
    Kevin Mullins

    Kevin Mullins

    10 months ago
    I’m also interested in this. I have a flow that under ideal circumstances will run very quickly, but occasionally might have to “re-snapshot” a source and run long. I believe this would fall into the same category where I would like to setup scheduling to only allow a single concurrent flow and only run a single instance again on the next run if there were missed events.
    Kevin Kho

    Kevin Kho

    10 months ago
    I don’t think the agent concurrency will be revisited anytime soon as it was attempted but not performant. The workaround though is outlined above. Let’s see if Samuel reports back?
    Kevin Mullins

    Kevin Mullins

    10 months ago
    Sounds good. I’ll end up trying the same thing if I get to it first 🙂
    Samuel Hinton

    Samuel Hinton

    10 months ago
    It's at the top of our backlog for next sprint so hopefully I'll check back in early next week with a full query and python snippet
    Hey @Kevin Kho, would you know how to, in a state handler, get the flow run id? Im trying to figure out how exactly to check to see if the current flow run for the handler is the latest one scheduled (so I dont kill it, but can kill everything else). But figuring out the flow run ID (or its randomised name) inside the handler has stumped me, so I’m probably overlooking something simple. This is the code so far:
    def state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
        if old_state.is_pending() and new_state.is_running():
    
            client = Client()
            now = dt.now(tz=pytz.UTC)
            result = client.graphql(
                """{
        flow(where: {
            archived: {_eq: false},
            name: {_eq: "%s"}
        }) {
            name
            archived
            flow_runs (where: {
            state: {_in: ["Scheduled", "Retrying", "Running"]}, 
            scheduled_start_time:{_lte: "%s"}
            }) {
            scheduled_start_time
            start_time
            name
            state
            id
            }
        }"""
                % (flow.name, now.isoformat())
            )
            # These flow runs will be everything thats scheduled to start in the past and
            # might have built up.
            flow_runs = result["data"]["flow"]["flow_runs"]
    
            # I dont want to run another task if:
            # 1. Theres already a flow in the running state
            # 2. If there are multiple scheduled, only the latest one should be run
            any_running = any([f["state"] == "Running" for f in flow_runs])
            if any_running:
                return Cancelled("Existing tasks are already running")
            
            scheduled = [f for f in flow_runs if f["state"] in ("Pending", "Scheduled")]
            if len(scheduled) > 1:
                last_scheduled_time = max([dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled])
                this_flow_run = None 
                # How do I get the flow run id? It doesnt seem to be in the Flow, and not in the State either
    
        pass
    Kevin Kho

    Kevin Kho

    10 months ago
    Just do
    prefect.context.get("flow_run_id")
    inside it
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Ahhh theres a funky little context box, perfect, thanks mate
    Ah would
    flow.context.get(…)
    work too, or is
    prefect.context
    going to do some scope matching or similar to know whats going on?
    Kevin Kho

    Kevin Kho

    10 months ago
    I don’t think that will work because context is not attached to it. Is it?
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Youre right, I was confusing it with
    flow_run_context
    inside Flow itself, but its never persisted as an attribute, apologies
    Kevin Kho

    Kevin Kho

    10 months ago
    No worries!
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Hmm, trying to manually set a schedule to enable testing of this, but its hitting me with the old “Interval can not be less than one minute when deploying to Prefect Cloud.” I am not deploying to prefect cloud, but to prefect server on my own hardware. Is there a way to turn off this validation that you know of?
    Kevin Kho

    Kevin Kho

    10 months ago
    Will check
    Looks hardcoded so I don’t think it’s configurable
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Ah well, if its client side then I guess I can just edit the code haha
    Theres a server side check too, damn. Okay, Ill figure out some other way of testing
    @Kevin Mullins @Kevin Kho reporting back with a working state handler.
    import datetime
    from datetime import timedelta, datetime as dt
    import json
    import os
    import gc
    from typing import Optional
    
    import pytz
    import requests
    import pandas as pd
    import prefect
    from prefect import task, Flow
    from prefect.engine.state import Failed
    from prefect.utilities.notifications import slack_notifier
    from prefect.engine.signals import SKIP
    from prefect.engine.state import Cancelled, State
    from prefect.client import Client
    
    
    def concurrent_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
        if old_state.is_pending() and new_state.is_running():
    
            client = Client()
            now = dt.now(tz=pytz.UTC).replace(microsecond=0) + timedelta(seconds=1)
            # Replacing microseconds because graphql api cant always handle the number of decimals
            result = client.graphql(
                """{
        flow(where: {
            archived: {_eq: false},
            name: {_eq: "%s"}
        }) {
            name
            archived
            flow_runs (where: {
            state: {_in: ["Submitted", "Queued", "Scheduled", "Retrying", "Running"]}, 
            scheduled_start_time:{_lte: "%s"}
            }) {
            scheduled_start_time
            start_time
            name
            state
            id
            }
        }
    }"""
                % (flow.name, now.isoformat())  # Sorry for % operator, but those {} make it a pain
            )
            # These flow runs will be everything thats scheduled to start in the past and
            # might have built up.
            logger = prefect.context.get("logger")
            # This might fail if the GraphQL cant find anything, but havent seen this in practise
            flow_runs = result["data"]["flow"][0]["flow_runs"]
    
            # I dont want to run another task if theres already more than one flow running
            # For me, Im happy to have two running at once, as API issues means we can get timeouts and
            # hangs that dont terminate easily. For other use cases, Id generally say to cancel if theres
            # any running
            num_running = sum([1 if f["state"] in ("Running", "Retrying") else 0 for f in flow_runs])
            if num_running > 1:
                msg = "Existing tasks are already running"
                <http://logger.info|logger.info>(msg)
                return Cancelled(msg)
    
            # And if there are multiple scheduled, only the latest one should be run
            scheduled = [
                f for f in flow_runs if f["state"] in ("Pending", "Scheduled", "Queued", "Submitted")
            ]
            if len(scheduled) > 1:
                last_scheduled_time = max(
                    [dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled]
                )
                this_flow_run_id = prefect.context.get("flow_run_id")
                matching_runs = [f for f in scheduled if f["id"] == this_flow_run_id]
                if not matching_runs:
                    <http://logger.info|logger.info>(f"Current id is {this_flow_run_id}")
                    <http://logger.info|logger.info>(f"Flow runs are: {scheduled}")
                    return Cancelled("Nope")
                this_run = matching_runs[0]
                this_run_time = dt.fromisoformat(this_run["scheduled_start_time"])
                if this_run_time != last_scheduled_time:
                    msg = "Multiple scheduled tasks, this is not the last one"
                    <http://logger.info|logger.info>(msg)
                    return Cancelled(msg)
    
        return new_state
    Will allow a max of two concurrent running jobs (can easily change this to one), and if multiple jobs are scheduled (ie your agent was down for a while and is now back up), only the last one will execute and the others will log and cancel. Not all the imports are necessary, havent filtered out the imports from the other handlers and common functions in the file.
    Kevin Mullins

    Kevin Mullins

    10 months ago
    Awesome! Thank you @Samuel Hinton very much for this.
    Kevin Kho

    Kevin Kho

    10 months ago
    Nice. This is worth archiving in the server repo. Want me to do that or do you want to post it?
    Samuel Hinton

    Samuel Hinton

    10 months ago
    Feel free to do it however you’d like @Kevin Kho 🙂
    Kevin Kho

    Kevin Kho

    10 months ago