https://prefect.io logo
Title
j

Joe Schmid

09/10/2019, 8:19 PM
Question on using an external Dask cluster (DaskExecutor) running in adaptive mode -- we notice that running a Flow that maps over elements (say 100 elements) only seems to use the number of Dask workers that existed at the time the flow run started. We would have expected the Dask cluster to begin scaling up workers (since it's in adaptive mode) and starting to run more Prefect tasks in parallel. I'm not sure we've really characterized this behavior well, but we've observed it enough that it seems to be a pattern. (We have a notebook that - using the k8s APIs - plots the number of requested workers, running workers, & nodes and could show some of what we're seeing if it would help.)
j

josh

09/10/2019, 8:26 PM
For my understanding you’re running into a scenario where you have a dask cluster in adaptive mode and (for example) you have N workers, start the flow run, cluster scales up to M workers, but it is still only using the original N?
b

Brett Naul

09/10/2019, 8:32 PM
could this be related to work stealing being disabled? the prefect docs say to do so but it does seem like that limits the effectiveness of an adaptive cluster
j

Joe Schmid

09/10/2019, 8:37 PM
@josh what we see is that the cluster doesn't even try to scale up beyond the original N workers. There are a lot of available (Pending) Prefect Tasks but in our case only 2 dask workers running 4 Tasks (2 vCPUs for each worker).
j

josh

09/10/2019, 8:37 PM
Interesting theory @Brett Naul I think I’ve seen an issue similar to this in the past with not every dask worker being used from an adaptive standpoint. I was using the
dask-kubernetes
library and I can’t say for sure if I had work stealing disabled
Ah @Joe Schmid so you’re seeing a scenario where dask isn’t scaling its amount of workers up. I have not seen that before!
j

Joe Schmid

09/10/2019, 8:40 PM
@Brett Naul We did recently disable work stealing, but saw the same behavior prior to that.
j

josh

09/10/2019, 8:40 PM
Are you using dask-kubernetes for your cluster?
j

Joe Schmid

09/10/2019, 8:40 PM
Yup, running on AWS EKS.
Specifically, using KubeCluster from dask-kubernetes for the adaptive part.
j

josh

09/10/2019, 9:00 PM
Interesting, I wonder if the dask scheduler thinks it doesn’t need to create more workers for itself. I’m trying to recreate this on a cluster to see if I get the same issue
j

Joe Schmid

09/10/2019, 9:04 PM
In case it helps, one aspect of the behavior we observed is that N Prefect tasks start running initially where N = w * c (w = number of initial Dask workers, c = vCPUs which is the same as nthreads that we start the workers with)
e.g. 4 tasks for 2 workers with 2 vCPU
j

josh

09/10/2019, 9:11 PM
There’s a couple things I think could be happening: 1. It’s starting with the amount of processes available (2 workers w/ 2 CPUs so 4 total) and sticking with that so it doesn’t think it needs to create more workers (e.g. only ever trying to do 4 things at once) 2. The mapped tasks aren’t trying to be executed in parallel but instead are happening sequentially and therefore dask isn’t creating more workers because it doesn’t think it needs to cc @Chris White
I think I was able to replicate what you’re experiencing. I made a flow:
@task
def first_task():
    return [100] * 100


@task
def compute(x):
    return x * 100


with Flow(
    "dktest", environment=DaskKubernetesEnvironment(min_workers=1, max_workers=3), storage=Docker(registry_url="my_registry", image_name="flows")
) as flow:
    one = first_task()
    result = compute.map(one)
This flow uses the
DaskKubernetesEnvironment
which simply uses
dask-kubernetes
to execute the flow on my cluster. When I ran this (with a single map) I never witnessed a second or third worker being created as it was chewing through each mapped task and they were being executed sequentially. However, I changed the flow to contain some parallel branches from the `first_task`:
one = first_task()
result = compute.map(one)
result2 = compute.map(one)
result3 = compute.map(one)
Then I saw more than one dask worker being dynamically created because I believe it was now trying to chew through the maps simultaneously.
j

Joe Schmid

09/10/2019, 9:45 PM
@josh thanks for the test case! We can give this a shot. Much appreciated.
c

Chris White

09/10/2019, 9:50 PM
Just catching up - this is interesting; we could also explore replicating this with Dask directly and file a bug report; under the hood, Prefect makes a call to
Client.map
https://distributed.dask.org/en/latest/api.html#distributed.Client.map; in Prefect, this call is actually made with a
worker_client
, which could also be relevant
j

Joe Schmid

09/10/2019, 10:18 PM
@Chris White it does seem like a potential bug/issue. At a minimum, it would certainly be desirable behavior to scale up workers when mapping.
If we can help provide more details, definitely let us know but it sounds like you already have a nice minimal example that demonstrates the issue.
👍 1
c

Chris White

09/10/2019, 10:20 PM
yup yup! I’ll sync up with @josh and try to recreate with Dask - we’ll keep you in the loop!
👍 2
Hey @Joe Schmid and @Jie Lou — just wanted to update you on this: @josh and I spent some time going down the rabbit hole, and we could sort-of recreate the issue, but not make much progress as to the cause. If the goal is resource conservation, at this moment my recommendation would be to use
DaskKubernetesEnvironment
but set the minimum number of workers to the amount you really want a the highest capacity, and at least the cluster will disappear when your run is complete. We’re still trying to figure out why the large number of pending mapped tasks isn’t triggering a scale-up though, and will let you know when we learn something new!
j

Jie Lou

09/12/2019, 10:23 PM
@Chris White Sounds good. Thanks for the update, Chris.
👍 1
c

Chris White

09/12/2019, 10:48 PM
@Joe Schmid / @Jie Lou are the Tasks that you are mapping very fast tasks? What’s a typical duration of the run for these tasks?
j

Joe Schmid

09/12/2019, 10:48 PM
@Chris White they're quite slow -- often minutes.
c

Chris White

09/12/2019, 10:53 PM
Interesting. OK so that is not what I was expecting; just to give you some insight, while your tasks are running, you can do the following:
from distributed import Client

c = Client("<tcp://scheduler-address>")
c.run_on_scheduler(lambda scheduler: scheduler.adaptive_target())
and call this function over and over as your Tasks progress; (note you might need to exec into one of your pods to have access to the scheduler). This is the endpoint dask-k8s calls to determine whether scaling should occur — we found that for tasks whose runtime was on the same order as the network communication cost, this function would return 1, but for longer tasks it would very quickly begin returning large values — not a solution, but just so you know the approach we were taking
j

Joe Schmid

09/12/2019, 10:54 PM
The update from our side is that we put our external Dask cluster in non-adaptive mode this morning and saw significantly better reliability today. (We did some work to make it easy to dynamically adjust our Dask cluster, e.g. adaptive on/off, set min/max # workers, set target # workers for non-adaptive, etc.)
👍 1
@Chris White that's helpful on running
scheduler.adaptive_target()
we can definitely do that. What we'll likely do in the short term is avoid using adaptive mode for now to remove that as a variable as we do lots of flow runs mapping over many elements.
👍 1
c

Chris White

09/12/2019, 10:58 PM
That sounds good!