I'm trying to use KubeCluster with DaskExecutor in...
# prefect-community
n
I'm trying to use KubeCluster with DaskExecutor in a simple flow on GCP:
Copy code
@task
def get_mean():
    import dask.array as da
    array = da.ones((1000, 1000, 1000))
    return array.mean().compute()

@task
def output_value(value):
    print(value)

with Flow("Static Dask Cluster Example") as flow:
    res = get_mean()
    output_value(res)

flow.run_config = KubernetesRun(labels=["dev"], image="my-image")

def get_cluster(n):
    pod_spec = make_pod_spec(image="my-image",
                              memory_limit='1G', memory_request='1G',
                              cpu_limit=1, cpu_request=1)
    return KubeCluster(pod_spec, n_workers=n)

flow.executor = DaskExecutor(cluster_class=get_cluster, cluster_kwargs={'n': 2})
flow.storage = GCS(bucket="my-bucket")
flow.register(project_name="tutorial")
However, I get a strange error:
Copy code
[2021-01-28 12:11:41+0000] ERROR - prefect.CloudFlowRunner | Unexpected error: AttributeError("can't set attribute")
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/opt/conda/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/opt/conda/lib/python3.8/site-packages/prefect/executors/dask.py", line 211, in start
    with self.cluster_class(**self.cluster_kwargs) as cluster:  # type: ignore
  File "k8_flow.py", line 33, in get_cluster
  File "/opt/conda/lib/python3.8/site-packages/dask_kubernetes/core.py", line 414, in __init__
    super().__init__(**self.kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 274, in __init__
    super().__init__(
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 69, in __init__
    self.name = str(uuid.uuid4())[:8]
AttributeError: can't set attribute
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 651, in close_clusters
    if cluster.status != Status.closed:
AttributeError: 'KubeCluster' object has no attribute 'status'
Exception ignored in: <function Cluster.__del__ at 0x7f99f6b89e50>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 113, in __del__
    if self.status != Status.closed:
AttributeError: 'KubeCluster' object has no attribute 'status'
This is running on a KubernetesAgent on GKE with the RBAC permissions mentioned in the documentation. The docker images I'm using are running dask-kubernetes==0.11.0, distributed==2020.12.0, prefect==0.14.5. Thank you in advance
👍 1
j
Hi Nikul, From inspecting the traceback, it looks like you're using distributed 2021.01.1 in your client (not 2020.12.0) - the code setting a name on the cluster was only part of the latest release. This unfortunately has introduced an incompatibility with dask-kubernetes. Until this is fixed, you'll need to ensure you're using distributed 2021.01.0 or earlier.
I went to open an issue upstream, but looks like there already is one. See https://github.com/dask/dask-kubernetes/issues/293 if you're interested in tracking the issue further.
(note that the error message that user got is different, but the underlying cause is the same)
n
Thank you for your reply! I can confirm downgrading to distributed 2020.12.1 solves the issue
👍 1