https://prefect.io logo
Title
r

Robin

07/12/2020, 10:56 PM
We just got our first
DaskKubernetesEnvironment
up and running, but it does not look very parallelized ūü§Ē Here is the code:
import prefect
from prefect import task, Flow
from prefect.environments.storage import Docker
from prefect.environments import DaskKubernetesEnvironment, LocalEnvironment
import time
import requests
# from prefect.tasks import kubernetes
from prefect.engine.executors import DaskExecutor
@task
def hello_task():
    time.sleep(10)
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello, Kubernetes!")
@task
def get_value():
    time.sleep(10)
    return 5
@task
def output_value(value):
    print(value)
@task
def fetch_google():
    r = requests.get("<http://www.google.com>")
    print(r.content)
with Flow("hello-k8s", environment=DaskKubernetesEnvironment(min_workers=1, max_workers=5, labels=["yogasev"])) as flow:
    value = get_value()
    output_value(value)
    fetch_google()
    hello_task()
    hello_task()
    hello_task()
    hello_task()
    hello_task()
flow.storage = Docker(registry_url="<my-repo>")
flow.register(project_name="eks_test_01")
Did we miss anything? On kubectl it looked parallelized (5 dask pods created in parallel), but prefects visualization suggests otherwise‚Ķ ūü§∑‚Äć‚ôāÔłŹ
k

Kingsley Blatter

07/13/2020, 3:07 PM
Hi @Robin I am going to raise an issue for the min max workers not being applied in the DaskKubernetesEnvironment, but in the meanwhile, you can use a worker_spec.yaml, see the spec here @Marvin open "DaskKubernetesEnvironment min max workers not being applied which is resulting in tasks not running in parallel"
@Marvin open "Dask in kubernetes environment not respecting min max workers"
r

Robin

07/13/2020, 4:21 PM
Thanks for the quick response and the raised issue! We will look into it ūüôā
j

josh

07/13/2020, 6:13 PM
Hey @Robin we’re looking into it as it appears to be a bug somewhere but in the meantime you might also be able to achieve what you’re after with an environment like this:
flow.environment = LocalEnvironment(
    executor=DaskExecutor(
        adapt_kwargs={
          "minimum": 1,
          "maximum": 5
        },
    ),
    labels=["yogasev"],
)
Running with this environment gets me the parallelism I expect ūüôā
r

Robin

07/14/2020, 10:42 AM
Hey @josh, it worked indeed (see images). Thanks for the quick fix and for looking into it! However, I have two questions: 1. It seems like 8 tasks were executed although I set the limit to
"maximum": 5
, why is that? 2. Does this workaround also work for an EKS kubernetes cluster? (I will try to check this on my own now)
j

josh

07/14/2020, 11:13 AM
So depending on the size of the compute resources, Dask workers can actually perform more than a single task at a time! e.g. If I have 5 workers each with 4 threads (4 CPU) then 20 tasks can be run at a time. To control the amount of threads each worker has you could do something like this:
flow.environment = LocalEnvironment(
    executor=DaskExecutor(
        cluster_kwargs={
          "threads_per_worker": 1
        },
        adapt_kwargs={
          "minimum": 1,
          "maximum": 5
        },
    ),
    labels=["yogasev"],
)
l

Luis Muniz

07/14/2020, 2:54 PM
We have the same issue here, after a lot of effort to get a Kubernetes execution environment to run. The issue with using a LocalEnvironment is that the Dask scheduler needs to be publicly available, and we would like to avoid that. This is why the Kubernetes Agent was such a nice fit for us
b

bruno.corucho

07/14/2020, 3:16 PM
@josh , if you can, keep us updated in case there's a workaround that suits for K8s Cluster while this issue is being fixed ! ūüôā