So I'm using the DaskExecutor with a task that has...
# prefect-community
p
So I'm using the DaskExecutor with a task that has a map() over a list of files. I would expect this to be done in parallel but it runs serially. How do I allow a flow to run in parallel?
c
can you share you code? Also, how do you know the tasks are still running serially?
p
I'm just watching the logs, and it goes through one by one
can I dm you?
dm-ed you the code
c
so my guess is that
start_cluster
(which appears to be a custom function) is creating a single worker with only 1 thread and 1 process available
p
I can share that, it's just the code from their docs wrapped in a function
Copy code
from dask_kubernetes import KubeCluster


def start_cluster():
    cluster = KubeCluster.from_yaml('worker-spec.yml')
    cluster.scale_up(10)  # specify number of nodes explicitly

    cluster.adapt(minimum=1, maximum=100)
    return cluster
when it starts up I notice it makes 10 and then immediately tears them down
in k8s
c
hm strange; can you try removing the k8s piece and just use the following executor:
Copy code
executor = DaskExecutor()
p
sure, just try it locally?
c
yup yup
p
that runs in parallel
c
gotcha; so there’s some issue with the kubernetes dask cluster you’re spinning up
does your worker spec specify
nthreads
and
nprocs
?
p
so, if I take away the scale_up() call, it starts out with 1 and then eventually spins up what looks like 100 pods
the other weird thing is that those initial pods from the scale_up immediately die
I'm not specifying anything you don't see
c
Interesting - I recommend opening an issue on the dask-kubernetes repo with the behavior you’re seeing
p
oh wait, sorry. I am specifying nthreads
c
are you fixing it to 1?
p
2
maybe it just looks serial, and is actually 2 at a time. the scale_up thing is odd though
c
hm i’d expect to see 2 tasks running simultaneously in that case
p
yeah it may be, they are slow
c
gotcha
yea the scale_up issue you should open a dask-kubernetes issue for
p
ideally I'd have some prewarmed pods so yeah I may do that
thanks!
c
anytime!
p
it seems like because my minimum was 1, it was killing the extra pods because it didn't have any load and I had adapt(min=1)
c
Gotcha, that makes sense