How do I ensure that a local agent will not attemp...
# ask-community
j
How do I ensure that a local agent will not attempt to run more than one flow at a time? `
Copy code
prefect agent local start {...}
k
There is no mechanism to do this on the agent level, just the Flow level (maximum of 1 concurrent Flow is X label). This is because Prefect is not "resource-aware" at the moment
t
@Kevin Kho non-resource-aware, does that mean in k8s too? i presented our devops the Prefect POC we plan to do - and they asked whether they have any way of controlling or limiting the number of jobs , resources, etc. it uses
a
@Tom Klein What Kevin meant is that Prefect is not a cluster manager such as Kubernetes, Mesos, or some cloud specific cluster (AWS ECS, GCP Vertex) - Prefect does not orchestrate resources on a given agent. However, if you pair Prefect with a cluster manager such as Kubernetes, then you can accomplish that. Specifically with a KubernetesAgent, you can use the KubernetesRun run configuration to specify your memory and CPU request for the Kubernetes job that will be spun up to run a flow. Additionally, you usually assign a label to an agent, e.g.:
Copy code
prefect agent local start --label xyz
Prefect Cloud has concurrency limits which allow you to limit the number of flow run that will be created with a given label, effectively limiting the number of flow run that will be executed on that agent. So there is a way of controlling the number of flow runs created, or specifying resource request for a Kubernetes job.
t
@Anna Geller cool, thanks! what about runs that aren’t KubernetesRun (but are still launched from a Kubernetes Agent) ? e.g. - DBT, Fivetran, Snowflake, etc. runs? there’s some default resources defined for them? or am i confusing the terms here and those are task types and can still have a kubernetes run-configuration? ah - actually i see that the run-config is a property of the flow and not the task at all 🤔 i think i got things mixed up with the Kubernetes Task (which can launch a separate K8s job, if i understand correctly)
a
if you run all your flows on a Kubernetes agent, then all your flows end up deployed as Kubernetes jobs regardless of what your tasks are doing (Fivetran, DBT, Snowflake).
💯 1
t
Yep, gotcha. Sort of tangent question: are there any plans or thoughts to make it (or support it ) so tasks themselves are run as jobs rather than the flow as a whole? Just thinking about the fact the tasks (in the same flow) share resources this way which could lead to some awkward situations with heavy tasks or fan-outs, no? I think Argo works this way?
a
There are no plans to do that afaik. But with Orion you can choose a different Task runner for a subflow, e.g. allowing you to offload only specific tasks (a subflow) to Dask. Having said that, nothing stops you from spinning up a separate container for a specific task - we have plenty of tasks that support that e.g. the Kubernetes and Docker tasks
t
hmm, right - i think the docker task would still run in the same Kub space as the other tasks (no?) but - yea, a kubernetes task is probably the solution for such cases 👍
j
My situation is that I have 7 local agents with the same label. I want each to run no more than one flow at a time. Surely this is possible?
a
@John Muehlhausen no, it’s not possible, unless you assign unique label to each of those seven agents and make use of flow concurrency limits, as described on this page.
j
Why would Cloud execute a flow on an agent that is already running a flow when a different agent with the same label is idle? This is what I'm seeing.
a
A local agent doesn’t monitor resource capacity. Say you have a local agent with a label called “prod”. This agent is then polling for flow runs that got deployed with a run config that also has the same label “prod”. When you have 7 agents with label “prod”, then all those 7 agents poll for the same flow runs. In the worst case, it could be that a flow run gets executed multiple times because more than one agent picked it up. Based on what you try to accomplish, my recommendation would be to assign a unique label to each of your agents, say “prod1", “prod2”, etc. Then you can assign various labels to various flows. But Prefect won’t round-robin those because Prefect is not a resource manager - if you need such functionality, you could instead build a Kubernetes cluster with those 7 VMs and let Kubernetes distribute your work across worker nodes based on the memory and cpu requests you specify for your flows.
👀 1
cc @John Muehlhausen
j
@Anna Geller It almost sounds like giving more than one agent the same label is dangerous due to multiple pick-up (your "worst case" above)? If that is true, why is it so easy to do? I guess my feedback is that someone new to Prefect can easily conclude that the purpose of labels is to tag agents (plural) that are capable of running certain flows and that Prefect will ensure that only one of them actually runs it. What are valid reasons to give more than one agent the same label?
a
That’s why I recommended to define distinct labels to each agent. It seems that what you need is some way to distribute work across nodes, is it correct? I don’t know your use case but setting up a Kubernetes cluster with those VMs could be an option worth considering. This way Kubernetes could manage distributing work based on resource requests such as cpu and memory.
j
So there is never a valid reason to give multiple agents the same label? If there is never a valid reason, shouldn't the Prefect Cloud UI warn if a "label collision" takes place? Honestly, this situation makes me sad. It really seems as if it should be "in scope" for Prefect to 1) ensure that only one agent picks up a flow and 2) allow an agent to be configured to stop looking for flows if it is already running more than N of them. With these features suddenly it is very useful rather than dangerous for multiple agents to have the same label. The links above don't seem to encourage or discourage multiple agents with the same label.
BTW, k8s doesn't seem to help anything if my agents (all eligible to run the same flows) are in different cities around the globe. Let me know if you think differently!
a
Why not? I think Kubernetes doesn't place any limitations in this regard. I'll have a look how the docs may explain this clearer.
j
Because a k8s cluster can't span datacenters... or at least, it is a disaster to try`
Copy code
In the worst case, it could be that a flow run gets executed multiple times because more than one agent picked it up.
Can you verify this statement? It really bothers me that this is possible.
a
j
We had some calls with Darren Pinder awhile back. Can we schedule another such call to go over the race condition where multiple agents might execute the same scheduled flow instance?
We are not going to try to span timezones with a single cluster/agent. My k8s guy says it is a bad idea.
a
Ok, I cross-checked that for you. I started three local agents with the same label. Then I started 3 flow runs of the same flow. There was no race condition: each flow run got executed only once. Nevertheless, I still believe that it’s best to assign unique labels to each agent and I will look into how to make it clearer in the docs. If you wish to talk to Sales, you can reach out to sales@prefect.io.
k
Flows will be picked up my multiple agents because Cloud can handle those race conditions with a locking mechanism if a Flow is picked up.
a
@John Muehlhausen for reference, this page explains what Kevin was referring to
j
"Prefect Cloud's opt-in version locking mechanism enforces the assertion that your work runs once and only once." ... any examples of how to use this?
Copy code
with Flow("...", schedule, executor=LocalDaskExecutor(scheduler="processes", num_workers=6)) as flow:
        # tasks...

    res=flow.register(project_name="...",
                  set_schedule_active=True,
                  idempotency_key='v8', # increment on each code change
                  labels=['a_label'],
                  add_default_labels=False,
                  no_url=True)
a
It’s just an additional guarantee, you don’t need to set it anywhere in your flow. There is an option to toggle it in the UI as well. But to be fair, I ran the above experiment without toggling the version locking and still each flow run was picked up and executed only once despite having 3 agents with the same label.
k
That is a different thing actually. Version Locking is on a task level. If you export 3 tasks to Dask like A -> B -> C and A succeeds and B succeeds but C fails due to a Dask worker dying, Dask will restart the worker and attempt to run A -> B - > C again because of how the failover mechanism works. Version locking will block A and B from running and just run C. Version Locking is enabled by default and can be found in the Flow settings. Regarding the local agents not picking up the same Flow Run, we have another locking mechanism in Cloud
j
@Anna Geller I appreciate the result of your experiment but race conditions only show up rarely, so an example where it worked doesn't prove that it will always work. In the flow.register above I need to know that for each flow generated by the schedule, it will run at most once when there are multiple agents polling for that label.
Assuming that I can know that multiple agents won't run the same instance of a flow schedule, what I will do next is dig into the local agent code to see if I can get it to stop polling for new work while it is running a flow. If I'm successful then my problems will be solved.
...so are you recommending that I use the UI feature screen-shotted above to turn on version locking?
a
honestly, it’s hard to recommend anything, because we prefer giving not opinionated recommendations. In general, the local agent docs says: “While the local agent is fully capable of executing flows in conjunction with the Prefect API, we generally recommend using one of the other agents to help with modularity and scale.” But perhaps the solution you just said will work? I wouldn’t discourage that. Feel free to dig deeper and LMK if you have any questions I can help with.
k
So I think while we have state locking in Cloud, and have not seen reports of multiple local agents picking up the same Flow, it’s hard to guarantee because as you said, race conditions are rare so there may be one that pops up. We introduced this state locking precisely because of a user with 4 local agents and two were picking up the same Flow so it seemed to resolve his issues. On the label collision, there is no load balancing but having multiple agents with the same label provides fault tolerance, which some users take advantage of. The setup with 7 agents that can all pick up the same Flow is more of a distributed Task Queue paradigm like Celery where there is a queue of work and it gets sent to the next available worker. Prefect is a batch orchestrator though so the concern is more oriented towards running jobs on a schedule. Prefect will not know what the limitations of a Local Agent are because we do not keep track of resource consumption. That said, this is probably more stable and balanced (in the Prefect paradigm) if you are explicit with labels and attach certain Flows to certain agents that way.
j
"Prefect will not know what the limitations of a Local Agent are because we do not keep track of resource consumption." @Kevin Kho Yes, but the agent knows its own limitations, so I think the local agent should be configurable regarding max concurrent flows. It will simply not pick up more work if it has a certain number of flows running already, thus ensuring that some other agent picks up the work. Controlling concurrency at the label level doesn't help when the label is shared.
k
It was attempted here but it wasn’t performant for current Prefect Core. Prefect 2.0 may open doors to make this achievable, but of course we can’t make promises around it.
j
Upon reviewing those tickets it seems like it would have been easier to implement for LocalAgent but this was not done since the concept was harder to generalize to all agent types. The label-level concurrency limits represent "punting on 4th down" since they don't really address limiting the use of any particular resource when labels are shared between agents.
@Anna Geller @Kevin Kho this does what I want for now since running flows in LocalAgent are just child processes. I may improve it by making children call
prctl(PR_SET_PDEATHSIG,...
so that they don't outlive their agent.
If you provide the Prefect community with the option to reap any flows started by an agent (upon agent death) then you could provide the option of agent-based concurrency control in only that configuration... it'd be a lot easier than trying to ask the server which flows are running!!!
z
Hey John, that’s a fun workaround! As you’ve noted, this won’t persist beyond agent death and we do not want all agent types to reap their flows on death. For example, your kubernetes agent pod may restart but that should not result in the termination of all of your kubernetes flow runs. This may meet your expectations, but would not meet those of many of our users. Even with a toggle, as you’ve noted, self-tracking agent concurrency does not work without it enabled. This also requires agents to track the state of the runs they create, which is not within the scope of the current design of agents. Personally, I want to provide the feature you’re looking for. However, I do not want to provide it in a way that is not robust and usable across our different execution paradigms. If we do, we will get new complaints that it is not working as expected. Since agents are client-side constructs, you can add local tracking for your own use-case as you have above.
j
@Zanie Thanks for chiming in! I have a question... does some element of the Prefect ecosystem always have the ability to stop what it started? Or can it sometimes be a runaway train? What I mean is that if an agent can't stop something it started, and if server/cloud can't stop something an agent started (can it, reliably?), then it seems like a recipe for getting devops involved when something goes off the rails. What do you think? Should Prefect always be able to stop what it starts?
z
That’s a great question and a tricky one. Elegant cancellation is one of the harder problems for an orchestration tool to solve. In Prefect <=1.0, agents are quite lightweight by design. They’re not meant to be able to handle much more than submitting flow runs as fast as they can. Instead of having the agent manage termination, the execution engine in Prefect is designed to teardown cleanly if the orchestration layer marks it as cancelled. Cancelling a flow run by stopping the kubernetes pod running it (as an agent might) isn’t always ideal. For example, if your flow run creates additional resources (like a dask cluster) and is killed by an external force, it may not have have a chance to clean anything up. This is why it’s easiest for the orchestration layer to give the tasks a terminal state and allow the execution engine to note that it has been told to tear down and begin that process from within the infrastructure. That said, I’m currently working on agents in Orion and I think they will likely have a more active role in monitoring (and terminating) the infrastructure they create. This will allow us more control in cases where the execution engine is failing to exit in a timely manner. There are tradeoffs to this approach though, and combining several approaches adds complexity for developers & users.
j
If I understand correctly, you are saying that server/cloud, via an executor, is in control of flow termination. Doesn't that mean that an agent-level concurrency control should be possible in server without any agent-side modifications other than making sure agents are named? I.e. instead of the agent needing to ask server whether it should accept more work, server can just not present a workload potential to that agent in the first place, if it knows the agent has started something that is still unterminated. Thanks for your work on these issues. For now we will put our flows in the iron grip of the LocalAgent's lifespan via LD_PRELOAD (for agent's children, grandchildren,etc) of
prctl(PR_SET_PDEATHSIG
such that a child process count is a reliable indication of whether to accept more work.
z
I.e. instead of the agent needing to ask server whether it should accept more work, server can just not present a workload potential to that agent in the first place, if it knows the agent has started something that is still unterminated.
This is basically the same as the closed implementation in Server for agent level concurrency. It just moves the concurrency check from a dedicated route to the route that retrieves ready flow runs. The issue here is actually not that the agent is polling the server to investigate concurrency, it is that aggregate count queries are not performant in Hasura. Once a certain scale is reached, the server will struggle to calculate the number of concurrent runs that a given agent has.
Thanks for your work on these issues.  For now we will put our flows in the iron grip of the LocalAgent’s lifespan via LD_PRELOAD (for agent’s children, grandchildren,etc) of 
prctl(PR_SET_PDEATHSIG
 such that a child process count is a reliable indication of whether to accept more work.
You’re welcome! Sorry I don’t have a great solution for you yet 🙂 We’re definitely thinking about this a lot, we just want to deliver a feature that works consistently and is well integrated with the rest of our offering. I like your workaround quite a bit haha