We just got our first `DaskKubernetesEnvironment` ...
# prefect-server
We just got our first
up and running, but it does not look very parallelized 🤔 Here is the code:
Copy code
import prefect
from prefect import task, Flow
from prefect.environments.storage import Docker
from prefect.environments import DaskKubernetesEnvironment, LocalEnvironment
import time
import requests
# from prefect.tasks import kubernetes
from prefect.engine.executors import DaskExecutor
def hello_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello, Kubernetes!")
def get_value():
    return 5
def output_value(value):
def fetch_google():
    r = requests.get("<http://www.google.com>")
with Flow("hello-k8s", environment=DaskKubernetesEnvironment(min_workers=1, max_workers=5, labels=["yogasev"])) as flow:
    value = get_value()
flow.storage = Docker(registry_url="<my-repo>")
Did we miss anything? On kubectl it looked parallelized (5 dask pods created in parallel), but prefects visualization suggests otherwise… 🤷‍♂️
Hi @Robin I am going to raise an issue for the min max workers not being applied in the DaskKubernetesEnvironment, but in the meanwhile, you can use a worker_spec.yaml, see the spec here @Marvin open "DaskKubernetesEnvironment min max workers not being applied which is resulting in tasks not running in parallel"
@Marvin open "Dask in kubernetes environment not respecting min max workers"
Thanks for the quick response and the raised issue! We will look into it 🙂
Hey @Robin we’re looking into it as it appears to be a bug somewhere but in the meantime you might also be able to achieve what you’re after with an environment like this:
Copy code
flow.environment = LocalEnvironment(
          "minimum": 1,
          "maximum": 5
Running with this environment gets me the parallelism I expect 🙂
Hey @josh, it worked indeed (see images). Thanks for the quick fix and for looking into it! However, I have two questions: 1. It seems like 8 tasks were executed although I set the limit to
"maximum": 5
, why is that? 2. Does this workaround also work for an EKS kubernetes cluster? (I will try to check this on my own now)
So depending on the size of the compute resources, Dask workers can actually perform more than a single task at a time! e.g. If I have 5 workers each with 4 threads (4 CPU) then 20 tasks can be run at a time. To control the amount of threads each worker has you could do something like this:
Copy code
flow.environment = LocalEnvironment(
          "threads_per_worker": 1
          "minimum": 1,
          "maximum": 5
We have the same issue here, after a lot of effort to get a Kubernetes execution environment to run. The issue with using a LocalEnvironment is that the Dask scheduler needs to be publicly available, and we would like to avoid that. This is why the Kubernetes Agent was such a nice fit for us
@josh , if you can, keep us updated in case there's a workaround that suits for K8s Cluster while this issue is being fixed ! 🙂