Hey all! I've been experimenting with `KubernetesR...
# prefect-community
a
Hey all! I've been experimenting with
KubernetesRun
https://github.com/PrefectHQ/prefect/blob/master/src/prefect/run_configs/kubernetes.py My goal is to be able to set a different job template on a "per Flow Run basis". It seems currently this is set up for a "per flow" basis since you have to do:
Copy code
flow.run_config = KubernetesRun(...)
flow.register(project_name=project_name)
So it appears you have to re-register if you want different resource limits per flow run. Any ideas on how to best achieve this?
j
Different resource requirements for the flow-runner pod per-flow-run aren't part of the design for run-configs, and aren't possible currently. If you expect wildly different resource requirements for different runs of the same flow, our current recommendation is to use a DaskExecutor with adaptive scaling (so the resources for the initial job can be static across all runs). Can you expand on your use case a bit?
a
Gotcha. The use case is being able to create a Flow that represents a machine learning workflow. And different runs of this flow may have vastly different sizes of input data for the same ML workflow. For bigger inputs, I might need much more memory, for example. I'd like my teammates to be able to specify the resources for their flow-run depending on what they think is appropriate, considering the time/cost tradeoff or the size of their input data
But perhaps it's also acceptable to have each different resource request for a given workflow be a separate Prefect Flow
j
How would you determine the resources for a specific flow run? Are you doing this manually when kicking off the flow run (e.g. looking at the data, then entering some new numbers before kicking off a run)? Or automated based on some input
Parameter
s?
Currently you'd need to have multiple versions of the flow, or use a
DaskExecutor
to scale up once the flow starts. We might add something like what you're looking for in the future - just trying to better understand how you'd use this feature if we were to add it.
👍 1
a
Thanks for that. I'm thinking about how to best answer that and will get back to you
It would be more of
manually when kicking off the flow run (e.g. looking at the data, then entering some new numbers before kicking off a run)
Let's say a
Parameter
is an s3 path to input data for the flow's processing task. This could point to data of widely varying size. So it'd be nice if the user passes in that path and then how much mem/CPU they want to limit it to for a run of that flow. Currently creating a new flow for each different mem/CPU resource limit level is an OK solution I think The reason we care about controlling resource limits is because we're going to have an underlying k8s cluster and a good number of people submitting Flows/Flow runs and we want people to be able to give their Flow runs enough muscle (their discretion) without eating up all the shared resources I need to learn more about the DaskExecutor and dask in general but it seems to me a similar problem would happen - how can we prevent a teammate's flow run from eating up a huge amount of compute without specifying some limits on the flow run level. If someone submits a big input dataset on a flow and Dask lets it scale up uninhibited, and a lot of people do that our k8s cluster may get overwhelemed. While at the same time if we set a low limit on Dask then some of of the less frequent, more intensive flow runs won't work An interesting thought a colleague pointed out - if it's called RunConfig you may initially think you could pass a RunConfig on the flow run level. Whereas really it's almost more of a FlowConfig because it sets the config for all runs in a flow
j
Hmmm ok. One other (hacky) solution that might work for you: When using run-configs, things not specified at the flow level default to values configured at the agent level. So you could: • configure a single flow with an empty
KubernetesRun()
• start e.g. 3 agents, each with increasing default resource capacities • at runtime, specify a label to select what agent you want the flow to be picked up by (say
small
,
medium
,
large
)
a
Ah! Cool idea. I was thinking of something similar as well earlier. I'll keep this in mind as a potential solution
j
I need to learn more about the DaskExecutor and dask in general but it seems to me a similar problem would happen - how can we prevent a teammate's flow run from eating up a huge amount of compute without specifying some limits on the flow run level. If someone submits a big input dataset on a flow and Dask lets it scale up uninhibited, and a lot of people do that our k8s cluster may get overwhelemed. While at the same time if we set a low limit on Dask then some of of the less frequent, more intensive flow runs won't work
We generally recommend users run with adaptive scaling for dask, which has an optional upper bound. Dask is then free to scale up and down as needed, generally it should do a good job releasing unused resources. K8s also does a pretty good job sharing resources among a team. If you want to get fancy, you can also configure things so the dask workers run in an autoscaling node pool on preemptible nodes, so you can get extra capacity as needed but use cheaper nodes for that (assuming you're using some kind of cloud provided k8s).
Given the requirements stated above, we might make it possible to provide a run-config at runtime as part of the kick-off request. This would mean any automated decisions on what parameters to pass would have to happen externally to your flow logic (since the flow won't have started yet), but would handle your use case. I'm not sure if the provided config should act as overrides per-field (so only specified fields are overridden) or a complete override (replacing whatever was configured on the flow). We'll keep an eye out for more uses for this and may add it down the road.
a
Awesome, we will consider these ideas as we try things out. Thanks Jim