Is there anything written up on using core alone w...
# ask-community
p
Is there anything written up on using core alone with dask-kubernetes?
j
Not that I am immediately aware of but you could use core alone with dask-kubernetes by using it to spin up a Dask cluster on k8s https://kubernetes.dask.org/en/latest/#quickstart Then you can add a Dask executor to your flow to point to the address of that Dask cluster’s scheduler https://docs.prefect.io/api/latest/engine/executors.html#daskexecutor
p
the thing that's not clear to me from looking at dask-kubernetes is what the actual address of the cluster is
for me to point prefect at it
looks like it makes a service
are you using this? I'm curious if the python in the linked example should be running in a deployment or what. This seems like the sort of thing a k8s controller would handle
j
Great question! From my understanding dask-kubernetes currently is set to only expose a somewhat ClusterIP based scheduler as in it is only accessible from inside the cluster. This means that your flow would have to be running inside the cluster also. Tbh I’m not entirely sure and maybe they have changed it recently 🤔 We have a deployment recipe for spinning up your own static cluster on k8s that give you full control over everything https://docs.prefect.io/cloud/recipes/k8s_dask.html so in this you could expose the service to an outside IP if desired. We are using dask-kubernetes but only in conjunction with the DaskKubernetesEnvironment which as of right now is largely based on Prefect Cloud execution. The way that operates is by spinning up a scheduler pod that executes the Flow and the Scheduler so there is no need for cross-service communication.
p
Gotcha, it looks like at least from this that the python code that spins up the native k8s workers should probably be running as a deployment. As far as the flows themselves, even if they are running inside of the cluster, it would still be a "remote" environment since it's connecting to a cluster?
Also let's say I have a job using a scheduler, does that flow need to be deployed as a deployment or something as well, or does it just need to run once, dispatch to dask, and dask will handle it?
j
Yeah a “remote” environment would work. The flow could be deployed as whatever you want (deployment, simple pod, job, etc.) We generally take the notion of a Job since they run once and complete
p
Well in the case of the scheduler, what is not clear is whether the flow needs to stay running in order to get tasks generated from it. in this case a pod/job would not be appropriate. If it's just sending stuff over to dask and it spins over there, then a Job would work. Can you clear that up?
j
Oh yeah I see what you’re saying. A job would work because the command would be something like:
flow.run(executor=…)
and then the Flow Runner will start and submit work using the Dask Executor to the Dask cluster running in your k8s cluster. Once the flow finishes the job will complete
The Flow Runner process stays alive until the flow is finished / an error occurs
p
But if it's a periodically scheduled flow, would it ever complete?
j
In that case it would stay alive until you acted upon it so perhaps something like a deployment would be more appropriate 🙂
p
ok cool, that's what I was thinking. Does this change with Cloud at all? Are you triggering execution from Cloud via an agent?
j
Yeah it does change with Cloud. In running w/ Cloud we treat 1 Flow Run = 1 Job and the runs are scheduled/orchestrated with the Cloud Scheduler. The Kubernetes Agent is responsible for watching for Flow Runs that need to be executed and it creates jobs for them on your cluster
p
so the big thing cloud seems to bring wrt this is that it handles the state of what needs to be executed and when vs me having to have that state somewhere else that requires more hand-holding, like k8s
j
Yep! It abstracts away a lot of the building / maintenance that teams usually have to undertake in working with workflow systems. And we take a general approach to it in attempts to handle all use cases
p
Thanks
j
Anytime!
j
@Preston Marshall just to piggyback on what @josh mentioned, this is exactly how we have Prefect deployed -- executing on a long-running Dask cluster that we create using dask-kubernetes -- and it works very well. We set the environment for our Flows to be this:
Copy code
env = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={"address": "<tcp://dask-scheduler:8786>"}
)
And our k8s Service for the dask scheduler is spec'ed as:
Copy code
apiVersion: v1
kind: Service
...
spec:
  ports:
    - name: dask-scheduler
      port: 8786
      targetPort: 8786
    - name: dask-webui
      port: 80
      targetPort: 8787
...
upvote 1
p
Thanks for the additional info. RE: the long running dask cluster. You just run that python code that spins up the cluster using dask-kubernetes once? It doesn't need to stay running as well?
j
Kubernetes will take care of keeping things running, e.g. for
kind: Deployment
k8s keeps how ever many pods you specify in
replicas
running for you. (If a pod dies, k8s will restart it, etc.)
p
Right I get that, I'm asking about the code in the quickstart: https://kubernetes.dask.org/en/latest/
does that just need to run once? Stay running? Seems like something CI could do and restart the cluster or something?
j
Ah, I see what you mean! Yes, we made a k8s Deployment for the scheduler that calls that python file (well, a similar one for us) from the quickstart. Like this:
Copy code
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: dask
    component: scheduler
    release: dask
  name: dask-scheduler
spec:
  replicas: 1
  strategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: dask
        component: scheduler
        release: dask
    spec:
      containers:
        - args:
            - python
            - /home/jovyan/srm-data-science/bin/srm_dask_apaptive_scheduler.py
          env:
            - name: DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING
              value: "False"
          image: <Docker Image that includes srm_dask_apaptive_scheduler.py>
          imagePullPolicy: IfNotPresent
          name: dask-scheduler
          ports:
            - containerPort: 8786
            - containerPort: 8787
p
gotcha, so it does need to stay running. What happens if it dies for some reason, does the cluster it spins up die with it?
also I appreciate the help with all of this, I know I could test all this out myself but this really speeds things up 🙂
j
No worries at all, happy to help!