https://prefect.io logo
Title
c

Christian Vogel

07/20/2022, 3:51 PM
Hi Prefect Community. I am currently trying to run a prefect flow using the DaskTaskRunner on a local kubernetes cluster. Unfortunately the flow is failing because of the following error:
File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 2742, in loads_function result = pickle.loads(bytes_object) File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 73, in loads return pickle.loads(x) ModuleNotFoundError: No module named 'prefect'
When entering the container, prefect seems to be available though. Do you have any idea what could be the reason?
From my first impression it looks like it is related to this issue maybe? https://stackoverflow.com/questions/39977635/dask-no-module-named-xxxx-error
I am using the following code of a repository, which was mentioned in the "show-us-what-you-got"-Channel:
from typing import List
import dask_kubernetes
from dask_kubernetes import make_pod_spec
from kubernetes.client import V1Pod
import prefect
from prefect import flow, get_run_logger, task
from prefect_dask.task_runners import DaskTaskRunner
import time


@task
def say_hello(name: str) -> None:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"hello {name}")


@task
def say_goodbye(name: str) -> None:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"goodbye {name}")


def dask_pod_spec() -> V1Pod:
    return make_pod_spec(
        image="<http://ghcr.io/dask/dask:latest|ghcr.io/dask/dask:latest>",
        memory_limit="1G",
        memory_request="1G",
        cpu_limit=1,
        cpu_request=1,
        extra_pod_config={"serviceAccountName": "default"},
    )


@flow(
    task_runner=DaskTaskRunner(
        cluster_class=dask_kubernetes.KubeCluster,
        cluster_kwargs={"pod_template": dask_pod_spec()},
        adapt_kwargs={"minimum": 1, "maximum": 2},
    )
)
def greetings(names: List[str]) -> None:
    for name in names:
        say_hello(name)
        say_goodbye(name)
    time.sleep(120)


if __name__ == "__main__":
    print(prefect.__version__)
    greetings(["arthur", "trillian", "ford", "marvin"])
k

Kevin Kho

07/20/2022, 4:17 PM
That looks like your Dask workers don’t have prefect installed?
Are you using the same image for flow and workers?
c

Christian Vogel

07/20/2022, 4:29 PM
I thought so, based on the documentation of pod_template and scheduler_pod_template from here: https://kubernetes.dask.org/en/latest/kubecluster.html#dask_kubernetes.KubeCluster
I just realized that my check for the prefect version does not really make sense. Since it is before the flow is executed and therefore not executed in the container, right?
k

Kevin Kho

07/20/2022, 4:32 PM
Yes I think that’s not part of the Flow
c

Christian Vogel

07/20/2022, 4:32 PM
On the other hand, i logged into a container of the image manually and found prefect to be available.
The error message just seemed extremely similar to the stackoverflow-issue. But I was not really certain how to implement the proposed solution from there into a prefect flow.
k

Kevin Kho

07/20/2022, 4:33 PM
The log shows conda. Are you relying on a specific environment being activated?
c

Christian Vogel

07/20/2022, 4:39 PM
Yea good point. Unfortunately I am not certain yet. on how dask is handling stuff behind the scenes. When working with the dask base image, conda is indeed involved.