Victor Churikov
05/28/2023, 8:09 PMMarvin
05/28/2023, 8:10 PMDaskTaskRunner
. For example, you can pass adapt_kwargs
to control the adaptive scaling of the cluster:
python
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
adapt_kwargs={"maximum": 10}
)
For more information on configuring the DaskTaskRunner
, you can refer to the Prefect documentation.
To customize Kubernetes tolerations specifically for Dask-Gateway, you might need to dive into the Dask-Gateway documentation or configuration files. I recommend checking the Dask-Gateway documentation for more details on customizing the Kubernetes setup.
I hope that helps, even if it's just a little bit. Good luck with your Kubernetes adventures.Victor Churikov
05/28/2023, 8:16 PMVictor Churikov
05/28/2023, 8:24 PMc.KubeClusterConfig.worker_extra_pod_config = {
"tolerations": [
{
"key": "key",
"operator": "Equal",
"value": "value",
"effect": "NoSchedule",
}
]
}
I’ll keep reading to see how this should be made configurable by the clients. Thanks, Marvin!Victor Churikov
05/28/2023, 9:04 PMdask
helmchart (as opposed to dask-gateway
has the concept of worker groups, where I can configure different groups with dedicated kubernetes configs. I’ll try it out, that’s probably the solution, because the Options object doesn’t seem configurable from the dask-gateway installation. By specifying different worker groups I should be able to achieved what I want (running each with different tolerations on different node pools)Marvin
05/28/2023, 10:10 PMVictor Churikov
05/29/2023, 11:44 AMgateway:
extraConfig:
options: |
from dask_gateway_server.options import Options, Integer, Float, String
def options_handler(options):
return {
"worker_cores": options.worker_cores,
"worker_memory": int(options.worker_memory * 2 ** 30),
"worker_extra_pod_config": {
"tolerations": [
{
"key": "nodepool",
"operator": "Equal",
"value": options.nodepool,
"effect": "NoSchedule",
}
]
}
}
c.Backend.cluster_options = Options(
String("nodepool", default='default-pool', label="Worker NodePool"),
Integer("worker_cores", default=1, min=1, max=16, label="Worker Cores"),
Float("worker_memory", default=0.5, min=0.1, max=32, label="Worker Memory (GiB)"),
handler=options_handler,
)
python code from client side:
import os
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
from dask_gateway import BasicAuth
from platform import node, platform
gateway_address = os.environ["DASK_GATEWAY_ADDRESS"]
gateway_password = os.environ["DASK_PASSWORD"]
auth = BasicAuth(password=gateway_password)
runner = DaskTaskRunner(
cluster_class='dask_gateway.GatewayCluster',
adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True},
cluster_kwargs={
'auth': auth,
'address': gateway_address,
'worker_cores': 1,
'worker_memory': 1,
'nodepool': 't4-cpu-pool'
}
)
@task
def check():
logger = get_run_logger()
<http://logger.info|logger.info>(f"Network: {node()}. ✅")
<http://logger.info|logger.info>(f"Instance: {platform()}. ✅")
@flow(task_runner=runner)
def poc_flow():
check.submit()
if __name__ == "__main__":
poc_flow()
Works really well! One gateway, and I create a GatewayCluster for each nodepool that I need to useVictor Churikov
05/29/2023, 11:49 AMNate
05/29/2023, 5:18 PM