<@ULVA73B9P> Can I customize the kubernetes tolera...
# ask-marvin
v
@Marvin Can I customize the kubernetes tolerations for pods scheduled by Dask-Gateway when using prefect_dask with cluster_class=‘dask_gateway.GatewayCluster’ as my task_runner?
1
m
Ah, the wonders of technology. I found some information that might help you customize Kubernetes tolerations for Dask-Gateway pods when using Prefect with Dask-Gateway as the task runner. Unfortunately, I couldn't find a direct answer to your question in the Prefect documentation. However, you can customize the Dask-Gateway cluster by passing arguments to the
DaskTaskRunner
. For example, you can pass
adapt_kwargs
to control the adaptive scaling of the cluster:
Copy code
python
DaskTaskRunner(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    adapt_kwargs={"maximum": 10}
)
For more information on configuring the
DaskTaskRunner
, you can refer to the Prefect documentation. To customize Kubernetes tolerations specifically for Dask-Gateway, you might need to dive into the Dask-Gateway documentation or configuration files. I recommend checking the Dask-Gateway documentation for more details on customizing the Kubernetes setup. I hope that helps, even if it's just a little bit. Good luck with your Kubernetes adventures.
v
Thanks Marvin, that didn’t help me solve the problem but it helped me to understand that there is probably nothing on Prefect’s side that can help it. I looked at the documentation for the Gateway and it mentioned exposing configuration variables to clients, so I should check if there are any kubernetes-specific configurations that can be exposed this way, maybe this is the solution.
This page says I can expose configurations to have the client customize them: https://gateway.dask.org/cluster-options.html Later in the page, it tells me to refer to this documentation to see which options are configurable for each backend: https://gateway.dask.org/api-server.html#kubernetes Among them there is worker_extra_pod_config with an example that matches my exact use case!
Copy code
c.KubeClusterConfig.worker_extra_pod_config = {
    "tolerations": [
        {
            "key": "key",
            "operator": "Equal",
            "value": "value",
            "effect": "NoSchedule",
        }
    ]
}
I’ll keep reading to see how this should be made configurable by the clients. Thanks, Marvin!
Looks like the
dask
helmchart (as opposed to
dask-gateway
has the concept of worker groups, where I can configure different groups with dedicated kubernetes configs. I’ll try it out, that’s probably the solution, because the Options object doesn’t seem configurable from the dask-gateway installation. By specifying different worker groups I should be able to achieved what I want (running each with different tolerations on different node pools)
m
thanks to @Nate 🙂, this thread has been saved to Discourse. You can find it here: https://discourse.prefect.io/t/3025/1
v
Got it working: dask-gateway values.yaml:
Copy code
gateway:
  extraConfig:
    options: |
      from dask_gateway_server.options import Options, Integer, Float, String
      def options_handler(options):
          return {
              "worker_cores": options.worker_cores,
              "worker_memory": int(options.worker_memory * 2 ** 30),
              "worker_extra_pod_config": {
                  "tolerations": [
                      {
                          "key": "nodepool",
                          "operator": "Equal",
                          "value": options.nodepool,
                          "effect": "NoSchedule",
                      }
                  ]
              }
          }

      c.Backend.cluster_options = Options(
          String("nodepool", default='default-pool', label="Worker NodePool"),
          Integer("worker_cores", default=1, min=1, max=16, label="Worker Cores"),
          Float("worker_memory", default=0.5, min=0.1, max=32, label="Worker Memory (GiB)"),
          handler=options_handler,
      )
python code from client side:
Copy code
import os
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
from dask_gateway import BasicAuth
from platform import node, platform

gateway_address = os.environ["DASK_GATEWAY_ADDRESS"]
gateway_password = os.environ["DASK_PASSWORD"]

auth = BasicAuth(password=gateway_password)

runner = DaskTaskRunner(
    cluster_class='dask_gateway.GatewayCluster',
    adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True},
    cluster_kwargs={
        'auth': auth,
        'address': gateway_address,
        'worker_cores': 1,
        'worker_memory': 1,
        'nodepool': 't4-cpu-pool'
    }
)

@task
def check():
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Network: {node()}. ✅")
    <http://logger.info|logger.info>(f"Instance: {platform()}. ✅")


@flow(task_runner=runner)
def poc_flow():
    check.submit()


if __name__ == "__main__":
    poc_flow()
Works really well! One gateway, and I create a GatewayCluster for each nodepool that I need to use
Updated the discourse thread as well
n
thank you @Victor Churikov !!