https://prefect.io logo
Title
p

Preston Marshall

02/07/2020, 11:14 PM
So I'm using the DaskExecutor with a task that has a map() over a list of files. I would expect this to be done in parallel but it runs serially. How do I allow a flow to run in parallel?
c

Chris White

02/07/2020, 11:15 PM
can you share you code? Also, how do you know the tasks are still running serially?
p

Preston Marshall

02/07/2020, 11:18 PM
I'm just watching the logs, and it goes through one by one
can I dm you?
dm-ed you the code
c

Chris White

02/07/2020, 11:20 PM
so my guess is that
start_cluster
(which appears to be a custom function) is creating a single worker with only 1 thread and 1 process available
p

Preston Marshall

02/07/2020, 11:21 PM
I can share that, it's just the code from their docs wrapped in a function
from dask_kubernetes import KubeCluster


def start_cluster():
    cluster = KubeCluster.from_yaml('worker-spec.yml')
    cluster.scale_up(10)  # specify number of nodes explicitly

    cluster.adapt(minimum=1, maximum=100)
    return cluster
when it starts up I notice it makes 10 and then immediately tears them down
in k8s
c

Chris White

02/07/2020, 11:22 PM
hm strange; can you try removing the k8s piece and just use the following executor:
executor = DaskExecutor()
p

Preston Marshall

02/07/2020, 11:22 PM
sure, just try it locally?
c

Chris White

02/07/2020, 11:22 PM
yup yup
p

Preston Marshall

02/07/2020, 11:23 PM
that runs in parallel
c

Chris White

02/07/2020, 11:24 PM
gotcha; so there’s some issue with the kubernetes dask cluster you’re spinning up
does your worker spec specify
nthreads
and
nprocs
?
p

Preston Marshall

02/07/2020, 11:26 PM
so, if I take away the scale_up() call, it starts out with 1 and then eventually spins up what looks like 100 pods
the other weird thing is that those initial pods from the scale_up immediately die
I'm not specifying anything you don't see
c

Chris White

02/07/2020, 11:27 PM
Interesting - I recommend opening an issue on the dask-kubernetes repo with the behavior you’re seeing
p

Preston Marshall

02/07/2020, 11:27 PM
oh wait, sorry. I am specifying nthreads
c

Chris White

02/07/2020, 11:28 PM
are you fixing it to 1?
p

Preston Marshall

02/07/2020, 11:28 PM
2
maybe it just looks serial, and is actually 2 at a time. the scale_up thing is odd though
c

Chris White

02/07/2020, 11:29 PM
hm i’d expect to see 2 tasks running simultaneously in that case
p

Preston Marshall

02/07/2020, 11:29 PM
yeah it may be, they are slow
c

Chris White

02/07/2020, 11:29 PM
gotcha
yea the scale_up issue you should open a dask-kubernetes issue for
p

Preston Marshall

02/07/2020, 11:30 PM
ideally I'd have some prewarmed pods so yeah I may do that
thanks!
c

Chris White

02/07/2020, 11:30 PM
anytime!
p

Preston Marshall

02/08/2020, 12:55 AM
it seems like because my minimum was 1, it was killing the extra pods because it didn't have any load and I had adapt(min=1)
c

Chris White

02/08/2020, 1:02 AM
Gotcha, that makes sense