Hello! Hope you are doing well and are motivated ...
# ask-community
m
Hello! Hope you are doing well and are motivated enough to help with an issue/question. My workflow consists of both map and reduce tasks. The mapped tasks have the split workload, so 2GB memory per worker is fine, but then when I do a reduce, I would like for the worker on this particular task to have more memory than an averaged worker. I tried increasing the memory limits in dask, but this works for all the workers, while I would like to specify the custom increased limits only on the worker for a specific task. Is this possible? Thanks a lot!
a
@Matic Lubej based on Dask documentation page, you can set memory limits for a specific worker this way:
Copy code
$ dask-worker <tcp://scheduler>:port --memory-limit=auto  # TOTAL_MEMORY * min(1, nthreads / total_nthreads)
$ dask-worker <tcp://scheduler>:port --memory-limit="4 GiB"  # four gigabytes per worker process.
But I wonder, even if you could allocate more resources to that specific Dask worker, how would you guarantee that this specific task is processed by that specific worker? I honestly don’t know how this could be accomplished, since the Dask scheduler is who decides how to distribute work across workers. Also, my understanding is that when you do reduce, not the worker, but the driver node collects the mapped tasks results in memory, so the driver node would need to have enough memory to perform that.
m
Hi @Anna Geller, thanks so much for helping out. It's completely possible that I don't have the proper understanding and phrasing to understand and/or describe the problem. I guess I don't really care about the workers, just that when a specific task is scheduled to occur, that more resources are allocated for whatever worker will be doing the job. If this is not the worker, but -- as you say -- the driver node, is this node handled differently by dask? is it even handled by dask? I guess I'm not sure anymore, but I remember having issues when such situations occurred, so I guess this means that also the driver node didn't have enough resources. Can this be controlled? Thanks again!
a
What is your current Dask setup? Do you have a long-running cluster or do you define an on-demand cluster per flow?
Just to give you more examples: here is an example flow that uses
coiled.Cluster
class passed to
DaskExecutor
, and on this cluster, you can set both: • scheduler_memory - defaults to 4 GiB • worker_memory - defaults to 8 GiB I’m no Dask expert but I can point you to some resources that can perhaps help a bit more: • This blog post explains more about Dask Cluster configuration, • The Prefect docs on Dask Deployment https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#deployment-dask
m
Thanks @Anna Geller. Re. clusters, we are doing both, mostly creating a local dask cluster on the spot. I will read up on the material and examples you provided and hopefully learn something new. Thanks again!
🙌 1
k
You should be able to allocate your scheduler more resources, but I doubt you can change during execution for a specific task. The scaling Dask provides is more horizontal than vertical