Nivi Mukka
09/21/2021, 6:49 PMmapNivi Mukka
09/21/2021, 6:50 PMNivi Mukka
09/21/2021, 6:53 PMKevin Kho
Nivi Mukka
09/21/2021, 6:57 PMKevin Kho
Nivi Mukka
09/21/2021, 6:58 PMNivi Mukka
09/21/2021, 6:58 PMKevin Kho
Kevin Kho
Nivi Mukka
09/21/2021, 7:23 PMDaskExecutordef create_cluster(min_workers, max_workers, docker_image, proxy_address):
        secret_client = secretmanager.SecretManagerServiceClient.from_service_account_json(
            'secrets/xxxxxxx.json')
        response = secret_client.access_secret_version(
            name="projects/xray-dev-dev-839i/secrets/dask-gateway/versions/latest")
        secrets = json.loads(response.payload.data.decode('UTF-8'))
        # connect to dask-gateway proxy
        auth = BasicAuth(password=secrets['dask_gateway_secret'])
        gateway = Gateway(address=proxy_address, auth=auth)
        options = gateway.cluster_options()
        options.environment = {
            "DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT": "3h",
            "DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP": "3h",
            "DASK_DISTRIBUTED__COMM__RETRY__COUNT": "3",
            "DASK_DISTRIBUTED__ADMIN__TICK__LIMIT": "3h",
            "DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT": "1h",
        }
        options.image = docker_image
        options.worker_memory = 80  
        options.worker_cores = 8 
        cluster = gateway.new_cluster(options)
        cluster.adapt(minimum=min_workers, maximum=max_workers)
        return clusterKevin Kho
Nivi Mukka
09/21/2021, 10:53 PMKevin Kho
Kevin Kho
