Hi Team, using Prefect Server (version 0.14.1) wit...
# prefect-server
n
Hi Team, using Prefect Server (version 0.14.1) with Dask Gateway. I am using the
map
method under Flow to run the same task in parallel for different inputs. It is working fine but not using all the Dask workers, which is causing Dask to run into resource/communication errors. How can I tell Prefect flow to run only one mapped task per worker?
Error from Dask worker logs
This mapped task has 15 runs but all running on the same worker - 1 or 2 at a time. And after 12 runs, worker shows this message above and the flow does not proceed further at all.
k
I am not sure. Does your cluster have the workers beforehand or does it autoscale up?
n
GKE cluster has autoscaling enabled. Dask cluster is spun up when the Flow run starts.
k
How many workers does it start with?
n
I see that Dask cluster is spinning up all the workers I requested but it is using only one to run all iterations of the mapped task. Hence, running into errors.
I have set it to have minimum 5. Tried upto 20 as well. It is able to start all those workers, just doesn’t use them.
k
Can I see how you set up your Dask cluster?
Someone does seem wrong with the scheduler, what behavior do you see without Prefect?
n
Using this function with the
DaskExecutor
and this is how the cluster is setup:
Copy code
def create_cluster(min_workers, max_workers, docker_image, proxy_address):
        secret_client = secretmanager.SecretManagerServiceClient.from_service_account_json(
            'secrets/xxxxxxx.json')

        response = secret_client.access_secret_version(
            name="projects/xray-dev-dev-839i/secrets/dask-gateway/versions/latest")
        secrets = json.loads(response.payload.data.decode('UTF-8'))

        # connect to dask-gateway proxy
        auth = BasicAuth(password=secrets['dask_gateway_secret'])
        gateway = Gateway(address=proxy_address, auth=auth)

        options = gateway.cluster_options()

        options.environment = {
            "DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT": "3h",
            "DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP": "3h",
            "DASK_DISTRIBUTED__COMM__RETRY__COUNT": "3",
            "DASK_DISTRIBUTED__ADMIN__TICK__LIMIT": "3h",
            "DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT": "1h",
        }

        options.image = docker_image
        options.worker_memory = 80  
        options.worker_cores = 8 
        cluster = gateway.new_cluster(options)
        cluster.adapt(minimum=min_workers, maximum=max_workers)
        return cluster
k
Nothing stands out here. I think you’d need to try using the Dask cluster without Prefect to see the behavior. I haven’t been able to dig anything around this Dask issue where the workers are not being used by the scheduler
n
I tried using Dask on its own and its behaving as expected with mapping or distribution. Also, only one of the mapped Prefect tasks is using single worker, all other mapped tasks in the flow are using multiple workers.
k
Ah okay that sounds like it may have been treated as a reduce. Do you have a minimal example?
It like has to do with chaining mapped tasks right?