Thread
#prefect-server
    Nivi Mukka

    Nivi Mukka

    1 year ago
    Hi Team, using Prefect Server (version 0.14.1) with Dask Gateway. I am using the
    map
    method under Flow to run the same task in parallel for different inputs. It is working fine but not using all the Dask workers, which is causing Dask to run into resource/communication errors. How can I tell Prefect flow to run only one mapped task per worker?
    Error from Dask worker logs
    This mapped task has 15 runs but all running on the same worker - 1 or 2 at a time. And after 12 runs, worker shows this message above and the flow does not proceed further at all.
    Kevin Kho

    Kevin Kho

    1 year ago
    I am not sure. Does your cluster have the workers beforehand or does it autoscale up?
    Nivi Mukka

    Nivi Mukka

    1 year ago
    GKE cluster has autoscaling enabled. Dask cluster is spun up when the Flow run starts.
    Kevin Kho

    Kevin Kho

    1 year ago
    How many workers does it start with?
    Nivi Mukka

    Nivi Mukka

    1 year ago
    I see that Dask cluster is spinning up all the workers I requested but it is using only one to run all iterations of the mapped task. Hence, running into errors.
    I have set it to have minimum 5. Tried upto 20 as well. It is able to start all those workers, just doesn’t use them.
    Kevin Kho

    Kevin Kho

    1 year ago
    Can I see how you set up your Dask cluster?
    Someone does seem wrong with the scheduler, what behavior do you see without Prefect?
    Nivi Mukka

    Nivi Mukka

    1 year ago
    Using this function with the
    DaskExecutor
    and this is how the cluster is setup:
    def create_cluster(min_workers, max_workers, docker_image, proxy_address):
            secret_client = secretmanager.SecretManagerServiceClient.from_service_account_json(
                'secrets/xxxxxxx.json')
    
            response = secret_client.access_secret_version(
                name="projects/xray-dev-dev-839i/secrets/dask-gateway/versions/latest")
            secrets = json.loads(response.payload.data.decode('UTF-8'))
    
            # connect to dask-gateway proxy
            auth = BasicAuth(password=secrets['dask_gateway_secret'])
            gateway = Gateway(address=proxy_address, auth=auth)
    
            options = gateway.cluster_options()
    
            options.environment = {
                "DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT": "3h",
                "DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP": "3h",
                "DASK_DISTRIBUTED__COMM__RETRY__COUNT": "3",
                "DASK_DISTRIBUTED__ADMIN__TICK__LIMIT": "3h",
                "DASK_DISTRIBUTED__DEPLOY__LOST_WORKER_TIMEOUT": "1h",
            }
    
            options.image = docker_image
            options.worker_memory = 80  
            options.worker_cores = 8 
            cluster = gateway.new_cluster(options)
            cluster.adapt(minimum=min_workers, maximum=max_workers)
            return cluster
    Kevin Kho

    Kevin Kho

    1 year ago
    Nothing stands out here. I think you’d need to try using the Dask cluster without Prefect to see the behavior. I haven’t been able to dig anything around this Dask issue where the workers are not being used by the scheduler
    Nivi Mukka

    Nivi Mukka

    1 year ago
    I tried using Dask on its own and its behaving as expected with mapping or distribution. Also, only one of the mapped Prefect tasks is using single worker, all other mapped tasks in the flow are using multiple workers.
    Kevin Kho

    Kevin Kho

    1 year ago
    Ah okay that sounds like it may have been treated as a reduce. Do you have a minimal example?
    It like has to do with chaining mapped tasks right?