Arnulfo Soriano
01/12/2021, 7:50 PMDaskKubernetesEnvironment(metadata={'image': 'someImage'},
worker_spec_file=os.path.join(os.path.dirname(os.path.abspath(__file__)),
f'./{worker_spec}'),
min_workers=min_workers,
max_workers=max_workers,
labels=['DASK'])
However when using DaskExecutor:
cluster = KubeCluster(make_pod_spec(image = 'someImage')).from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
#somethings i've tried
#cluster = KubeCluster.from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
DaskExecutor(cluster_class=lambda: cluster,adapt_kwargs={"minimum": min_workers, "maximum": max_workers},)
I keep getting on the registering step:
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/core.py", line 512, in _start
await ClusterAuth.load_first(self.auth)
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 85, in load_first
raise auth_exc
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 71, in load_first
await auth_instance.load()
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 125, in load
kubernetes.config.load_kube_config(
File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 661, in load_kube_config
loader = _get_kube_config_loader_for_yaml_file(
File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 624, in _get_kube_config_loader_for_yaml_file
raise ConfigException(
kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.
Vincent
01/12/2021, 7:59 PMDaskExecutor(cluster_class=lambda: cluster ...)
gets executed on the pod in the flow. The lambda makes it a delayed function. (this is also why prefect.context.image
works as seen in the "migration" page)
One thing you could try is to not instantiate the KubeCluster during registration. and instead put that in a function..
I have something like this
def load_pod_spec_from_yaml(yaml_path, image):
try:
import yaml
except ImportError:
raise ImportError(
"PyYaml is required to use yaml functionality, please install it!"
)
with open(yaml_path) as f:
d = yaml.safe_load(f)
d = dask.config.expand_environment_variables(d)
pod_spec = make_pod_from_dict(d)
pod_spec.spec.containers[0].image = image
return pod_spec
flow.executor = DaskExecutor(
cluster_class=lambda: KubeCluster(load_pod_spec_from_yaml(worker_spec_file, prefect.context.image)),
adapt_kwargs={"minimum": args.min_workers, "maximum": args.max_workers},
)
Arnulfo Soriano
01/12/2021, 9:33 PMSkip Breidbach
01/12/2021, 10:36 PM