Nivi Mukka
09/21/2021, 6:49 PMmap
method under Flow to run the same task in parallel for different inputs. It is working fine but not using all the Dask workers, which is causing Dask to run into resource/communication errors. How can I tell Prefect flow to run only one mapped task per worker?Kevin Kho
09/21/2021, 6:54 PMNivi Mukka
09/21/2021, 6:57 PMKevin Kho
09/21/2021, 6:57 PMNivi Mukka
09/21/2021, 6:58 PMKevin Kho
09/21/2021, 7:10 PMNivi Mukka
09/21/2021, 7:23 PMDaskExecutor
and this is how the cluster is setup:
def create_cluster(min_workers, max_workers, docker_image, proxy_address):
secret_client = secretmanager.SecretManagerServiceClient.from_service_account_json(
'secrets/xxxxxxx.json')
response = secret_client.access_secret_version(
name="projects/xray-dev-dev-839i/secrets/dask-gateway/versions/latest")
secrets = json.loads(response.payload.data.decode('UTF-8'))
# connect to dask-gateway proxy
auth = BasicAuth(password=secrets['dask_gateway_secret'])
gateway = Gateway(address=proxy_address, auth=auth)
options = gateway.cluster_options()
options.environment = {
"DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT": "3h",
"DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP": "3h",
"DASK_DISTRIBUTED__COMM__RETRY__COUNT": "3",
"DASK_DISTRIBUTED__ADMIN__TICK__LIMIT": "3h",
"DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT": "1h",
}
options.image = docker_image
options.worker_memory = 80
options.worker_cores = 8
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=min_workers, maximum=max_workers)
return cluster
Kevin Kho
09/21/2021, 8:06 PMNivi Mukka
09/21/2021, 10:53 PMKevin Kho
09/21/2021, 10:54 PM