I'm trying to use KubeCluster with DaskExecutor in...
# prefect-community
I'm trying to use KubeCluster with DaskExecutor in a simple flow on GCP:
Copy code
def get_mean():
    import dask.array as da
    array = da.ones((1000, 1000, 1000))
    return array.mean().compute()

def output_value(value):

with Flow("Static Dask Cluster Example") as flow:
    res = get_mean()

flow.run_config = KubernetesRun(labels=["dev"], image="my-image")

def get_cluster(n):
    pod_spec = make_pod_spec(image="my-image",
                              memory_limit='1G', memory_request='1G',
                              cpu_limit=1, cpu_request=1)
    return KubeCluster(pod_spec, n_workers=n)

flow.executor = DaskExecutor(cluster_class=get_cluster, cluster_kwargs={'n': 2})
flow.storage = GCS(bucket="my-bucket")
However, I get a strange error:
Copy code
[2021-01-28 12:11:41+0000] ERROR - prefect.CloudFlowRunner | Unexpected error: AttributeError("can't set attribute")
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/opt/conda/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/opt/conda/lib/python3.8/site-packages/prefect/executors/dask.py", line 211, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:  # type: ignore
  File "k8_flow.py", line 33, in get_cluster
  File "/opt/conda/lib/python3.8/site-packages/dask_kubernetes/core.py", line 414, in __init__
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 274, in __init__
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 69, in __init__
    self.name = str(uuid.uuid4())[:8]
AttributeError: can't set attribute
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 651, in close_clusters
    if cluster.status != Status.closed:
AttributeError: 'KubeCluster' object has no attribute 'status'
Exception ignored in: <function Cluster.__del__ at 0x7f99f6b89e50>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 113, in __del__
    if self.status != Status.closed:
AttributeError: 'KubeCluster' object has no attribute 'status'
This is running on a KubernetesAgent on GKE with the RBAC permissions mentioned in the documentation. The docker images I'm using are running dask-kubernetes==0.11.0, distributed==2020.12.0, prefect==0.14.5. Thank you in advance
👍 1
Hi Nikul, From inspecting the traceback, it looks like you're using distributed 2021.01.1 in your client (not 2020.12.0) - the code setting a name on the cluster was only part of the latest release. This unfortunately has introduced an incompatibility with dask-kubernetes. Until this is fixed, you'll need to ensure you're using distributed 2021.01.0 or earlier.
I went to open an issue upstream, but looks like there already is one. See https://github.com/dask/dask-kubernetes/issues/293 if you're interested in tracking the issue further.
(note that the error message that user got is different, but the underlying cause is the same)
Thank you for your reply! I can confirm downgrading to distributed 2020.12.1 solves the issue
👍 1