Hello everyone! If you use the `DaskKubernetesEnvi...
# ask-community
j
Hello everyone! If you use the
DaskKubernetesEnvironment
with a custom YAML spec, can you still specify min & max workers in the class
j
Hey @John Ramirez you should still be able to set
min_workers
and
max_workers
if you provide custom worker YAML. Are you seeing something different?
j
No I was reading the prefect documentation and it was not clear if setting the number of workers in a
spec.yaml
file would over write it
j
I don’t think it does! The default yaml that it’s overwriting is the default provided by the dask kubernetes library
But I see what you’re saying from the documentation 🤔
j
ok ill try it out
j
Great! Yeah the cluster is always created with the call
Copy code
cluster.adapt(minimum=self.min_workers, maximum=self.max_workers)
and the YAML is provided prior so it looks to me like the min/max get the final word but I can’t say for certain without testing it. Let me know if it works for you!
j
Is there a way, without using the custom spec, to tell
DaskKubernetesEnvironment
to use a custom dask image? I’m using a number of extra packages
j
You should build those packages into your flow’s Docker storage because by default all of the workers use your flow’s storage so dependencies match
j
do you get the same parallel runs on mapping tasks with the
DaskKubernetesEnvironment
j
Yeah the main difference with using the
DaskKubernetesEnvironment
over an environment with a static dask cluster is that the cluster begins and ends with that single flow run
j
ok I dont see mapping task running in parallel in the website
j
🤔 What are you setting for min/max workers?
j
in tis case the max is 3 which might be too small. one other thing that is not clear is the baseline for the workers if you dont use a custom spec
j
Do you see multiple workers in your cluster during your run? The default min is
1
and IIRC dask is responsible for determining when to scale up the amount of workers
j
yes i see all three workers
j
Interesting, not sure why it wouldn’t be parallelizing the flow if that’s the intended behavior. Are you able to make a reproducible example?
j
Im going to try one thing first. im going to increase the min workers casue right now it 1 and see if that does anything
👍 1
ok if you set the min worker to 1 then you do not get the parallel mapping
j
Oh that’s really interesting cc @Jim Crist-Harif in case he’s seen something like this before. Otherwise I can attempt to reproduce
j
When using adaptive scaling (min and max workers), dask will internally decide if it seems worth adding another worker to speedup your workload. This is a weighted calculation - is the time it takes to add another worker worth any potential speedups given the known set of todo tasks. If the tasks you're testing with run fairly quickly and are few in number, you might not see a scale up at all, since dask doesn't think it's worth it.
j
so in my testing, I saw the workers did spin up but the parallel run of mapping did not occur
j
How much work were you allocating, and how long did it take? Dask may also choose not to send tasks to a worker if reallocating work may take longer than the computation (this is referred to as "work stealing" in the dask docs).
If you're using task tags to pin work to certain workers, this also currently interferes with work stealing.
Are the new workers just sitting idle? If so, for how long?
j
In the original run. I had a min=1, max=3. When the mapping task was being “prepared” the additional workers came up and were used. But the actually execution of the tasks were linear
j
And if you already had 2 (or more) workers up you did see parallel execution with map?
j
i jumped to 10 on the next run and did see the parallel execution there
j
When you were seeing linear execution of the map tasks, were the workers working on anything else? Or were they sitting idle, with each worker periodically handling a map task? Dask has free reign to decide which tasks to run when, so an idle cluster is a better indication of a bug than changes in execution ordering/parallelization.