Sander
11/23/2022, 5:42 PMRyan Peden
11/23/2022, 5:47 PMdef
and not async def
?Sander
11/23/2022, 6:00 PMfrom dask_kubernetes.operator import KubeCluster
from prefect import task, flow
from prefect_dask import DaskTaskRunner
from prefect_aws.s3 import S3Bucket
from prefect.infrastructure.kubernetes import KubernetesJob
import logging
logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
logger = logging.getLogger(__name__)
task_runner = DaskTaskRunner(
cluster_class=KubeCluster,
cluster_kwargs=dict(
name='prefect_dask',
image='<http://ghcr.io/dask/dask:latest|ghcr.io/dask/dask:latest>',
n_workers=1,
),
)
# Define some tasks for us to run in our flow
@task
def extract() -> list:
return [1, 2, 3, 4, 5, 6]
@task(retries=3, retry_delay_seconds=15)
def transform(number: int) -> int:
return number * 2
@task()
def load(numbers: list) -> list:
return [i for i in numbers if i]
@flow(
name="Dask Kubernetes Flow",
task_runner=task_runner,
)
def main():
numbers = extract()
tranformed_numbers = transform.map(numbers)
numbers_twice = transform.map(tranformed_numbers)
result = load(numbers=numbers_twice)
Ryan Peden
11/23/2022, 9:49 PMSander
11/23/2022, 9:49 PMfrom prefect.filesystems import S3
from prefect.deployments import Deployment
from dask_kubernetes.operator import KubeCluster
from prefect import task, flow
from prefect_dask import DaskTaskRunner
from prefect_aws.s3 import S3Bucket
from prefect.infrastructure.kubernetes import KubernetesJob
import ephemeral_dask
import logging
logging.basicConfig(encoding='utf-8', level=logging.DEBUG)
logger = logging.getLogger(__name__)
storage = S3Bucket.load("k8s-minio") # load a pre-defined block
infra = KubernetesJob.load('k8s')
deployment = Deployment.build_from_flow(
flow=ephemeral_dask.main,
name="k8s-test",
work_queue_name="kubernetes",
storage=storage,
infrastructure=infra,
)
deployment.apply()
from prefect.infrastructure.kubernetes import KubernetesJob, KubernetesImagePullPolicy
from prefect_aws.s3 import S3Bucket, MinIOCredentials
minio_cred = MinIOCredentials(
minio_root_user='minioadmin',
minio_root_password='minioadmin'
)
minio_cred.save('minio-cred', overwrite=True)
bucket = S3Bucket(
bucket_name='k8s',
minio_credentials=minio_cred,
endpoint_url="<https://minio:9000>",
)
bucket.save('k8s-minio', overwrite=True)
k8_job = KubernetesJob(
image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
env={
"EXTRA_PIP_PACKAGES": "prefect_aws",
},
)
k8_job.save('k8s', overwrite=True)
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
name: default
namespace: default
subjects:
◦ kind: ServiceAccount
# Reference to upper's `metadata.name`
name: default
# Reference to upper's `metadata.namespace`
namespace: default
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
Ryan Peden
11/23/2022, 9:58 PMSander
11/23/2022, 10:07 PMRyan Peden
11/26/2022, 1:04 AMkubectl
port forwarding to see what the Dask workers were doing.
To sum things up: it looks like prefect-dask
is not always wrapping Task exceptions properly (which is something we can fix), but I think this specific exception has an easy workaround. When ran your code and checked the Dask worker logs, I saw exceptions because the workers were trying to import the picked task Prefect sent over, but the prefect
package was not installed on the Dask worker node.
The Dask image can install extra dependencies when starting up; it checks the EXTRA_PIP_PACKAGES
and EXTRA_CONDA_PACKAGES
environment variables, and Prefect can be installed via either of them. You can pass either of these to KubeCluster:
task_runner = DaskTaskRunner(
cluster_class=KubeCluster,
cluster_kwargs=dict(
name='prefect-dask',
image='<http://ghcr.io/dask/dask:latest|ghcr.io/dask/dask:latest>',
n_workers=1,
env={"EXTRA_PIP_PACKAGES": "prefect prefect-aws"}
),
)
This may or may not work depending on your execution environment. It seems like the Dask operator gives up and restarts the scheduler or worker containers if they aren't ready within 60 seconds.
I think in many cases that would be enough, but I'm running x64 containers under emulation on an m1 Mac, so I could never get a K8s Dask cluster to start if it needed to install extra dependencies on startup.
So as a quick test, I used the following `Dockerfile`:
FROM <http://ghcr.io/dask/dask:2022.11.1|ghcr.io/dask/dask:2022.11.1>
RUN pip install prefect prefect-aws
I built and tagged the image and pushed it to Docker Hub:
docker build -t rpeden/dask-prefect-aws:2.6.9 .
docker push rpeden/dask-prefect-aws:2.6.9
And then when I used that as the image for KubeCluster
, the Dask cluster started up and ran the tasks as expected.
I think a similar approach would work for you. You might need to include extra pip or Conda packages for your production code depending on what your tasks need when they run.Sander
11/28/2022, 8:44 AMRyan Peden
11/30/2022, 5:46 PMSander
11/30/2022, 5:54 PMRyan Peden
11/30/2022, 5:55 PMSander
11/30/2022, 5:58 PMRyan Peden
11/30/2022, 6:01 PMSander
11/30/2022, 6:22 PMRyan Peden
11/30/2022, 6:23 PMSander
11/30/2022, 6:24 PMHTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods is forbidden: User \"system:serviceaccount:default:default\" cannot create resource \"pods\" in API group \"\" in the namespace \"default\"","reason":"Forbidden","details":{"kind":"pods"},"code":403}