Getting Prefect to work with Dask on Kubernetes
# prefect-kubernetes
r
This is a good place for it; the more people who see it, the better. I tried to reproduce the error you ran into when you posted about it last week, but didn't have much luck. I'll give it another try, though. Were the tasks you were trying to run in Dask synchronous, i.e. defined with
def
and not
async def
?
s
Indeed synchronous
On a high level it’s set up as follows. Prefect Orion running with the default deployment spec auto generated by prefect. Same for the agent. I have a kubernetesjob as infra. The kubernetesjob then spawns the dask cluster in the same namespace.
Slight issue with rbac on the kubernetesjob that I had to fix by setting some rights in the namespace for that service account. Once that is solved you should run into the issue.
Will try to share some code in a bit.
Copy code
from dask_kubernetes.operator import KubeCluster
from prefect import task, flow
from prefect_dask import DaskTaskRunner
from prefect_aws.s3 import S3Bucket
from prefect.infrastructure.kubernetes import KubernetesJob
Copy code
import logging
logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
Copy code
logger = logging.getLogger(__name__)
Copy code
task_runner = DaskTaskRunner(
    cluster_class=KubeCluster,
    cluster_kwargs=dict(
        name='prefect_dask',
        image='<http://ghcr.io/dask/dask:latest|ghcr.io/dask/dask:latest>',
        n_workers=1,
    ),
)
Copy code
# Define some tasks for us to run in our flow
Copy code
@task
def extract() -> list:
    return [1, 2, 3, 4, 5, 6]
Copy code
@task(retries=3, retry_delay_seconds=15)
def transform(number: int) -> int:
    return number * 2
Copy code
@task()
def load(numbers: list) -> list:
    return [i for i in numbers if i]
Copy code
@flow(
    name="Dask Kubernetes Flow",
    task_runner=task_runner,
)
def main():
    numbers = extract()
    tranformed_numbers = transform.map(numbers)
    numbers_twice = transform.map(tranformed_numbers)
    result = load(numbers=numbers_twice)
r
Thanks - this is very helpful
s
And the deployment
Copy code
from prefect.filesystems import S3
from prefect.deployments import Deployment
from dask_kubernetes.operator import KubeCluster
from prefect import task, flow
from prefect_dask import DaskTaskRunner
from prefect_aws.s3 import S3Bucket
from prefect.infrastructure.kubernetes import KubernetesJob
import ephemeral_dask
Copy code
import logging
logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
Copy code
logger = logging.getLogger(__name__)
Copy code
storage = S3Bucket.load("k8s-minio")  # load a pre-defined block
infra = KubernetesJob.load('k8s')
Copy code
deployment = Deployment.build_from_flow(
    flow=ephemeral_dask.main,
    name="k8s-test",
    work_queue_name="kubernetes",
    storage=storage,
    infrastructure=infra,
)
Copy code
deployment.apply()
That was a bit easier to encode than the cli
And the infra blocks etc
Copy code
from prefect.infrastructure.kubernetes import KubernetesJob, KubernetesImagePullPolicy
from prefect_aws.s3 import S3Bucket, MinIOCredentials
Copy code
minio_cred = MinIOCredentials(
    minio_root_user='minioadmin',
    minio_root_password='minioadmin'
)
minio_cred.save('minio-cred', overwrite=True)
Copy code
bucket = S3Bucket(
    bucket_name='k8s',
    minio_credentials=minio_cred,
    endpoint_url="<https://minio:9000>",
)
bucket.save('k8s-minio', overwrite=True)
Copy code
k8_job = KubernetesJob(
    image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
    env={
        "EXTRA_PIP_PACKAGES": "prefect_aws",
    },
)
k8_job.save('k8s', overwrite=True)
To not run into permission errors in the default namespace I executed this as well.
Copy code
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
  name: default
  namespace: default
subjects:
◦ kind: ServiceAccount
Copy code
# Reference to upper's `metadata.name`
    name: default
    # Reference to upper's `metadata.namespace`
    namespace: default
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
I hope you can replicate (or point out my bug :) )
Prefect pods are based on prefecthq/prefect:2.6.1-python3.10
Btw. The reference to ephemeral dask is my file with the code mentioned.
r
This is great - thank you for providing such an excellent example
Should be much easier to replicate it now 😄
🚀 1
s
Let me know if more info is helpful
@Ryan Peden is it working for you? (Don’t want to rush but excited to hear your findings)
r
Hi Sander, sorry for the slow reply! I was able to reproduce this, and I used
kubectl
port forwarding to see what the Dask workers were doing. To sum things up: it looks like
prefect-dask
is not always wrapping Task exceptions properly (which is something we can fix), but I think this specific exception has an easy workaround. When ran your code and checked the Dask worker logs, I saw exceptions because the workers were trying to import the picked task Prefect sent over, but the
prefect
package was not installed on the Dask worker node. The Dask image can install extra dependencies when starting up; it checks the
EXTRA_PIP_PACKAGES
and
EXTRA_CONDA_PACKAGES
environment variables, and Prefect can be installed via either of them. You can pass either of these to KubeCluster:
Copy code
task_runner = DaskTaskRunner(
    cluster_class=KubeCluster,
    cluster_kwargs=dict(
        name='prefect-dask',
        image='<http://ghcr.io/dask/dask:latest|ghcr.io/dask/dask:latest>',
        n_workers=1,
        env={"EXTRA_PIP_PACKAGES": "prefect prefect-aws"}
    ),
)
This may or may not work depending on your execution environment. It seems like the Dask operator gives up and restarts the scheduler or worker containers if they aren't ready within 60 seconds. I think in many cases that would be enough, but I'm running x64 containers under emulation on an m1 Mac, so I could never get a K8s Dask cluster to start if it needed to install extra dependencies on startup. So as a quick test, I used the following `Dockerfile`:
Copy code
FROM <http://ghcr.io/dask/dask:2022.11.1|ghcr.io/dask/dask:2022.11.1>
RUN pip install prefect prefect-aws
I built and tagged the image and pushed it to Docker Hub:
Copy code
docker build -t rpeden/dask-prefect-aws:2.6.9 .
docker push rpeden/dask-prefect-aws:2.6.9
And then when I used that as the image for
KubeCluster
, the Dask cluster started up and ran the tasks as expected. I think a similar approach would work for you. You might need to include extra pip or Conda packages for your production code depending on what your tasks need when they run.
s
Fantastic! Will check it out. Makes sense indeed.
Works like a charm!
r
Excellent! I'm happy to hear it worked; I think I will adapt this into a Discourse post to help anyone else who runs into the same issue
s
Sounds good. I think we need a couple of improvements though. I guess one is better error handling in the prefect side. Second, some nice way to handle the permissions for creating the dask scheduler and workers.
r
Excellent points. I have a pull request open to improve the error handling: https://github.com/PrefectHQ/prefect-dask/pull/52
🚀 1
s
Nice one!
r
The permissions aren't quite as easy to address because it seems like it depends on cluster setup. I didn't run into any permission errors, but you did so others probably will as well. Where/how were you running your K8s cluster? I will try to reproduce what you encountered so I can start by documenting it, and then from there see if there any ways to make things simpler easier for Prefect users who want to use Dask on K8s
s
I was running my cluster on digital ocean without anything additionally.
r
Perfect - should be easy for me to reproduce that setup 😄
s
I ran into this.
Copy code
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods is forbidden: User \"system:serviceaccount:default:default\" cannot create resource \"pods\" in API group \"\" in the namespace \"default\"","reason":"Forbidden","details":{"kind":"pods"},"code":403}
I think it should be resolved on the kubernetesjob manifest in prefect but not really sure if that makes the most sense.