Nikul
01/28/2021, 12:24 PM@task
def get_mean():
import dask.array as da
array = da.ones((1000, 1000, 1000))
return array.mean().compute()
@task
def output_value(value):
print(value)
with Flow("Static Dask Cluster Example") as flow:
res = get_mean()
output_value(res)
flow.run_config = KubernetesRun(labels=["dev"], image="my-image")
def get_cluster(n):
pod_spec = make_pod_spec(image="my-image",
memory_limit='1G', memory_request='1G',
cpu_limit=1, cpu_request=1)
return KubeCluster(pod_spec, n_workers=n)
flow.executor = DaskExecutor(cluster_class=get_cluster, cluster_kwargs={'n': 2})
flow.storage = GCS(bucket="my-bucket")
flow.register(project_name="tutorial")
However, I get a strange error:
[2021-01-28 12:11:41+0000] ERROR - prefect.CloudFlowRunner | Unexpected error: AttributeError("can't set attribute")
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/conda/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/opt/conda/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/opt/conda/lib/python3.8/site-packages/prefect/executors/dask.py", line 211, in start
with self.cluster_class(**self.cluster_kwargs) as cluster: # type: ignore
File "k8_flow.py", line 33, in get_cluster
File "/opt/conda/lib/python3.8/site-packages/dask_kubernetes/core.py", line 414, in __init__
super().__init__(**self.kwargs)
File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 274, in __init__
super().__init__(
File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 69, in __init__
self.name = str(uuid.uuid4())[:8]
AttributeError: can't set attribute
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 651, in close_clusters
if cluster.status != Status.closed:
AttributeError: 'KubeCluster' object has no attribute 'status'
Exception ignored in: <function Cluster.__del__ at 0x7f99f6b89e50>
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 113, in __del__
if self.status != Status.closed:
AttributeError: 'KubeCluster' object has no attribute 'status'
This is running on a KubernetesAgent on GKE with the RBAC permissions mentioned in the documentation.
The docker images I'm using are running dask-kubernetes==0.11.0, distributed==2020.12.0, prefect==0.14.5.
Thank you in advanceJim Crist-Harif
01/28/2021, 4:24 PMNikul
01/29/2021, 9:33 AM