https://prefect.io logo
Title
n

Nivi Mukka

09/21/2021, 6:49 PM
Hi Team, using Prefect Server (version 0.14.1) with Dask Gateway. I am using the
map
method under Flow to run the same task in parallel for different inputs. It is working fine but not using all the Dask workers, which is causing Dask to run into resource/communication errors. How can I tell Prefect flow to run only one mapped task per worker?
Error from Dask worker logs
This mapped task has 15 runs but all running on the same worker - 1 or 2 at a time. And after 12 runs, worker shows this message above and the flow does not proceed further at all.
k

Kevin Kho

09/21/2021, 6:54 PM
I am not sure. Does your cluster have the workers beforehand or does it autoscale up?
n

Nivi Mukka

09/21/2021, 6:57 PM
GKE cluster has autoscaling enabled. Dask cluster is spun up when the Flow run starts.
k

Kevin Kho

09/21/2021, 6:57 PM
How many workers does it start with?
n

Nivi Mukka

09/21/2021, 6:58 PM
I see that Dask cluster is spinning up all the workers I requested but it is using only one to run all iterations of the mapped task. Hence, running into errors.
I have set it to have minimum 5. Tried upto 20 as well. It is able to start all those workers, just doesn’t use them.
k

Kevin Kho

09/21/2021, 7:10 PM
Can I see how you set up your Dask cluster?
Someone does seem wrong with the scheduler, what behavior do you see without Prefect?
n

Nivi Mukka

09/21/2021, 7:23 PM
Using this function with the
DaskExecutor
and this is how the cluster is setup:
def create_cluster(min_workers, max_workers, docker_image, proxy_address):
        secret_client = secretmanager.SecretManagerServiceClient.from_service_account_json(
            'secrets/xxxxxxx.json')

        response = secret_client.access_secret_version(
            name="projects/xray-dev-dev-839i/secrets/dask-gateway/versions/latest")
        secrets = json.loads(response.payload.data.decode('UTF-8'))

        # connect to dask-gateway proxy
        auth = BasicAuth(password=secrets['dask_gateway_secret'])
        gateway = Gateway(address=proxy_address, auth=auth)

        options = gateway.cluster_options()

        options.environment = {
            "DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT": "3h",
            "DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP": "3h",
            "DASK_DISTRIBUTED__COMM__RETRY__COUNT": "3",
            "DASK_DISTRIBUTED__ADMIN__TICK__LIMIT": "3h",
            "DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT": "1h",
        }

        options.image = docker_image
        options.worker_memory = 80  
        options.worker_cores = 8 
        cluster = gateway.new_cluster(options)
        cluster.adapt(minimum=min_workers, maximum=max_workers)
        return cluster
k

Kevin Kho

09/21/2021, 8:06 PM
Nothing stands out here. I think you’d need to try using the Dask cluster without Prefect to see the behavior. I haven’t been able to dig anything around this Dask issue where the workers are not being used by the scheduler
n

Nivi Mukka

09/21/2021, 10:53 PM
I tried using Dask on its own and its behaving as expected with mapping or distribution. Also, only one of the mapped Prefect tasks is using single worker, all other mapped tasks in the flow are using multiple workers.
k

Kevin Kho

09/21/2021, 10:54 PM
Ah okay that sounds like it may have been treated as a reduce. Do you have a minimal example?
It like has to do with chaining mapped tasks right?