Hello Prefect! So I just noticed that the Dask K8s...
# prefect-community
b
Hello Prefect! So I just noticed that the Dask K8s Parallelism issue should now be solved and working after checking this but unfortunately, I couldn't have it running properly so far. Can anyone give me a hand? Source code:
Copy code
@task
def hello_world():
    time.sleep(15)
    logger = prefect.context.get("logger")
    logger.info("I WOKE UP, GUYS!!!")

with Flow("strdata-test") as test:
    hello_world()
    hello_world()
    hello_world()
    hello_world()
    hello_world()

    for n in range(4):
Copy code
test.storage = Docker(registry_url=os.environ["REGISTRY_URL"],
                                       dockerfile="deploy/Dockerfile",
                                       image_name="strdata-flow"
                                       )

test.environment = DaskKubernetesEnvironment(max_workers=4)
test.register(project_name=os.environ["PROJECT_NAME"])
Output:
j
Hi @bruno.corucho this is probably due to the version of prefect inside your Docker storage! If you set
prefect_version='master'
on your Docker storage does it fix the issue?
Also noting, that if your base image or Dockerfile install prefect itself then that’s where you should specify the master build 🙂
b
Hello @josh! Might very well be! I was using the "latest", gonna give it a try like so. Thanks! Will give it a try
j
Yeah in the docker image tagging scheme
prefecthq/prefect:latest
corresponds to the most recent pointed release and
prefecthq/prefect:master
corresponds to master
r
@Severin Ryberg [sevberg], let’s try out whether the parallelism now also works on our dask-kubernetes agent 🙂
b
with master version, our new chart: Is the time discrepancy because of the time the workers are taking initially to "fire up"? Trying this again with a bigger n of tasks
@Robin we're using exactly the same setup as you guys. Please share your achievements if u can!
j
@bruno.corucho Yeah there will be a bit of startup time related to kubernetes pulling the images and hopefully would be a bit reduced in subsequent runs once the images are cached on the cluster
r
But why don’t all the pods start at the same time?
j
If you have it set to the above example where it only has
max_workers=4
then it will start with 1 worker and dask will dynamically scale up to those 4 as it sees fit. If you want more workers at the start then do something like
min_workers=4
b
@josh alright, though so. THank you so much Josh for the 5 stars support you've been giving to us and everyone else in here. SO much appreciated ! Lastly, if you don't mind, do you guys have any prefect documentation on how we can take the most out of prefect-dask backend's parallelism ? Is this done only at the function-call level? Or does it auto-partition data during function like map(), etc? If there's a documentation that Illustrates this better, I'm happy with that! 🙂 Thank you!
upvote 1
@Robin for my case I had the minimum set at 1.
j
Hmm maybe some relevant resources:

https://www.youtube.com/watch?v=50tmq7vMOl0

https://docs.prefect.io/core/concepts/engine.html#executors https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html https://examples.dask.org/applications/prefect-etl.html Pretty much whenever you have tasks that can be executed in parallel prefect lets dask handle all of that. As an example if you have a flow with multiple branches that don’t rely on each other these branches will be executed in parallel using dask. Also taking advantage of mapping whenever possible will for sure utilize dask to the fullest because all of those tasks will exist at the same level of the flow and can be executed in parallel.
Another example: all of the
r
tasks can be executed in parallel once the upstream
t
tasks finish
b
Thank you @josh!
r
So we are currently running our dask-kubernetes cluster again and set the number of nodes to 10 on EKS. The flow is defined as follows:
Copy code
@task
def say_hello():
    print("Hello, world! Got a secret for you :)")
    sleep(10)


with Flow("simple-dask-kube-flow") as flow:

    for i in range(30):
        say_hello()

flow.environment = DaskKubernetesEnvironment(
    min_workers=10, max_workers=100, labels=["k8s"]
)

flow.storage = Docker(registry_url="secreturl", prefect_version="master")


flow.register(project_name="eks_test_01")
So we would expect the dask cluster to spin up many workers and execute the tasks in parallel. And indeed, dask creates many workers (see attached). However, they are somehow cancelled and don’t run in parallel as can be seen on the gantt chart (see attached). Weirdly, in the beginning there were even 4 parallel runs, but afterwards only seriel runs… Any thoughts what could be the reasons?
j
🤔 Interesting, could you open an issue on the repo for this? I’m wondering if it’s Dask prioritizing things in a weird way
r
Sure!
@Marvin open “dask-kubernetes cluster on EKS does not properly execute tasks in parallel”
@nicholas is it not intended to open issues like this via Marvin?
n
Hi @Robin Marvin is picky and only responds to Prefect people 😉
r
OK, fair enough! 😄 I will open an issue in the old fassion then 🙂
n
@Marvin open "Dask-Kubernetes cluster on EKS does not properly execute tasks in parallel"
n
No need!
r
Thanks! I am going to add our code for reproduction purpose 🙂
n
Perfect, thank you!
r
Oh, on writing the issue I just noticed that I had still
prefect 0.12.3+44.g0b3ef62bc.dirty
. Might this cause the problem? Unfortunately, my colleague teard down the EKS cluster, we might tear it up again later, or tomorrow (it’s 20:30 over here in Germany)
j
Yeah that could be the cause of it!