https://prefect.io logo
#prefect-community
Title
# prefect-community
j

John Ramirez

05/08/2020, 1:10 PM
Hello everyone! If you use the
DaskKubernetesEnvironment
with a custom YAML spec, can you still specify min & max workers in the class
j

josh

05/08/2020, 1:13 PM
Hey @John Ramirez you should still be able to set
min_workers
and
max_workers
if you provide custom worker YAML. Are you seeing something different?
j

John Ramirez

05/08/2020, 1:16 PM
No I was reading the prefect documentation and it was not clear if setting the number of workers in a
spec.yaml
file would over write it
j

josh

05/08/2020, 1:18 PM
I don’t think it does! The default yaml that it’s overwriting is the default provided by the dask kubernetes library
But I see what you’re saying from the documentation 🤔
j

John Ramirez

05/08/2020, 1:19 PM
ok ill try it out
j

josh

05/08/2020, 1:20 PM
Great! Yeah the cluster is always created with the call
Copy code
cluster.adapt(minimum=self.min_workers, maximum=self.max_workers)
and the YAML is provided prior so it looks to me like the min/max get the final word but I can’t say for certain without testing it. Let me know if it works for you!
j

John Ramirez

05/08/2020, 1:33 PM
Is there a way, without using the custom spec, to tell
DaskKubernetesEnvironment
to use a custom dask image? I’m using a number of extra packages
j

josh

05/08/2020, 1:39 PM
You should build those packages into your flow’s Docker storage because by default all of the workers use your flow’s storage so dependencies match
j

John Ramirez

05/08/2020, 2:21 PM
do you get the same parallel runs on mapping tasks with the
DaskKubernetesEnvironment
j

josh

05/08/2020, 2:23 PM
Yeah the main difference with using the
DaskKubernetesEnvironment
over an environment with a static dask cluster is that the cluster begins and ends with that single flow run
j

John Ramirez

05/08/2020, 2:24 PM
ok I dont see mapping task running in parallel in the website
j

josh

05/08/2020, 2:24 PM
🤔 What are you setting for min/max workers?
j

John Ramirez

05/08/2020, 2:26 PM
in tis case the max is 3 which might be too small. one other thing that is not clear is the baseline for the workers if you dont use a custom spec
j

josh

05/08/2020, 2:29 PM
Do you see multiple workers in your cluster during your run? The default min is
1
and IIRC dask is responsible for determining when to scale up the amount of workers
j

John Ramirez

05/08/2020, 2:29 PM
yes i see all three workers
j

josh

05/08/2020, 2:36 PM
Interesting, not sure why it wouldn’t be parallelizing the flow if that’s the intended behavior. Are you able to make a reproducible example?
j

John Ramirez

05/08/2020, 3:00 PM
Im going to try one thing first. im going to increase the min workers casue right now it 1 and see if that does anything
👍 1
ok if you set the min worker to 1 then you do not get the parallel mapping
j

josh

05/08/2020, 3:34 PM
Oh that’s really interesting cc @Jim Crist-Harif in case he’s seen something like this before. Otherwise I can attempt to reproduce
j

Jim Crist-Harif

05/08/2020, 3:57 PM
When using adaptive scaling (min and max workers), dask will internally decide if it seems worth adding another worker to speedup your workload. This is a weighted calculation - is the time it takes to add another worker worth any potential speedups given the known set of todo tasks. If the tasks you're testing with run fairly quickly and are few in number, you might not see a scale up at all, since dask doesn't think it's worth it.
j

John Ramirez

05/08/2020, 4:07 PM
so in my testing, I saw the workers did spin up but the parallel run of mapping did not occur
j

Jim Crist-Harif

05/08/2020, 4:09 PM
How much work were you allocating, and how long did it take? Dask may also choose not to send tasks to a worker if reallocating work may take longer than the computation (this is referred to as "work stealing" in the dask docs).
If you're using task tags to pin work to certain workers, this also currently interferes with work stealing.
Are the new workers just sitting idle? If so, for how long?
j

John Ramirez

05/08/2020, 4:14 PM
In the original run. I had a min=1, max=3. When the mapping task was being “prepared” the additional workers came up and were used. But the actually execution of the tasks were linear
j

Jim Crist-Harif

05/08/2020, 4:16 PM
And if you already had 2 (or more) workers up you did see parallel execution with map?
j

John Ramirez

05/08/2020, 4:17 PM
i jumped to 10 on the next run and did see the parallel execution there
j

Jim Crist-Harif

05/08/2020, 4:21 PM
When you were seeing linear execution of the map tasks, were the workers working on anything else? Or were they sitting idle, with each worker periodically handling a map task? Dask has free reign to decide which tasks to run when, so an idle cluster is a better indication of a bug than changes in execution ordering/parallelization.