Hi, Im trying to migrate some code from using Pref...
# prefect-community
a
Hi, Im trying to migrate some code from using Prefect enviroments to Prefect executors since enviroments have been depracated. When using DaskKubernetesEnvironment the below code registers just fine and works how we want it to work nicely.
Copy code
DaskKubernetesEnvironment(metadata={'image': 'someImage'},
                                         worker_spec_file=os.path.join(os.path.dirname(os.path.abspath(__file__)),
                                                                       f'./{worker_spec}'),
                                         min_workers=min_workers,
                                         max_workers=max_workers,
                                         labels=['DASK'])
However when using DaskExecutor:
Copy code
cluster = KubeCluster(make_pod_spec(image = 'someImage')).from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
#somethings i've tried
#cluster = KubeCluster.from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
DaskExecutor(cluster_class=lambda: cluster,adapt_kwargs={"minimum": min_workers, "maximum": max_workers},)
I keep getting on the registering step:
Copy code
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/core.py", line 512, in _start
    await ClusterAuth.load_first(self.auth)
  File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 85, in load_first
    raise auth_exc
  File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 71, in load_first
    await auth_instance.load()
  File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 125, in load
    kubernetes.config.load_kube_config(
  File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 661, in load_kube_config
    loader = _get_kube_config_loader_for_yaml_file(
  File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 624, in _get_kube_config_loader_for_yaml_file
    raise ConfigException(
kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.
v
I recently switched over from the DaskKubernetesEnvironment as well. I found that the line
Copy code
DaskExecutor(cluster_class=lambda: cluster ...)
gets executed on the pod in the flow. The lambda makes it a delayed function. (this is also why
prefect.context.image
works as seen in the "migration" page) One thing you could try is to not instantiate the KubeCluster during registration. and instead put that in a function.. I have something like this
Copy code
def load_pod_spec_from_yaml(yaml_path, image):
                try:
                    import yaml
                except ImportError:
                    raise ImportError(
                        "PyYaml is required to use yaml functionality, please install it!"
                    )
                with open(yaml_path) as f:
                    d = yaml.safe_load(f)
                    d = dask.config.expand_environment_variables(d)
                    pod_spec = make_pod_from_dict(d)
                    pod_spec.spec.containers[0].image = image
                    return pod_spec

            flow.executor = DaskExecutor(
                cluster_class=lambda: KubeCluster(load_pod_spec_from_yaml(worker_spec_file, prefect.context.image)),
                adapt_kwargs={"minimum": args.min_workers, "maximum": args.max_workers},
            )
👍 1
a
@Vincent Thank you very much. That worked nicely.
@Skip Breidbach
😎 1
s
Awesome!