https://prefect.io logo
Title
b

bruno.corucho

07/17/2020, 12:26 PM
Hello Prefect! So I just noticed that the Dask K8s Parallelism issue should now be solved and working after checking this but unfortunately, I couldn't have it running properly so far. Can anyone give me a hand? Source code:
@task
def hello_world():
    time.sleep(15)
    logger = prefect.context.get("logger")
    logger.info("I WOKE UP, GUYS!!!")

with Flow("strdata-test") as test:
    hello_world()
    hello_world()
    hello_world()
    hello_world()
    hello_world()

    for n in range(4):
test.storage = Docker(registry_url=os.environ["REGISTRY_URL"],
                                       dockerfile="deploy/Dockerfile",
                                       image_name="strdata-flow"
                                       )

test.environment = DaskKubernetesEnvironment(max_workers=4)
test.register(project_name=os.environ["PROJECT_NAME"])
Output:
j

josh

07/17/2020, 12:27 PM
Hi @bruno.corucho this is probably due to the version of prefect inside your Docker storage! If you set
prefect_version='master'
on your Docker storage does it fix the issue?
Also noting, that if your base image or Dockerfile install prefect itself then that’s where you should specify the master build 🙂
b

bruno.corucho

07/17/2020, 12:28 PM
Hello @josh! Might very well be! I was using the "latest", gonna give it a try like so. Thanks! Will give it a try
j

josh

07/17/2020, 12:29 PM
Yeah in the docker image tagging scheme
prefecthq/prefect:latest
corresponds to the most recent pointed release and
prefecthq/prefect:master
corresponds to master
r

Robin

07/17/2020, 1:23 PM
@Severin Ryberg [sevberg], let’s try out whether the parallelism now also works on our dask-kubernetes agent 🙂
b

bruno.corucho

07/17/2020, 1:24 PM
with master version, our new chart: Is the time discrepancy because of the time the workers are taking initially to "fire up"? Trying this again with a bigger n of tasks
@Robin we're using exactly the same setup as you guys. Please share your achievements if u can!
j

josh

07/17/2020, 1:25 PM
@bruno.corucho Yeah there will be a bit of startup time related to kubernetes pulling the images and hopefully would be a bit reduced in subsequent runs once the images are cached on the cluster
r

Robin

07/17/2020, 1:26 PM
But why don’t all the pods start at the same time?
j

josh

07/17/2020, 1:28 PM
If you have it set to the above example where it only has
max_workers=4
then it will start with 1 worker and dask will dynamically scale up to those 4 as it sees fit. If you want more workers at the start then do something like
min_workers=4
b

bruno.corucho

07/17/2020, 1:28 PM
@josh alright, though so. THank you so much Josh for the 5 stars support you've been giving to us and everyone else in here. SO much appreciated ! Lastly, if you don't mind, do you guys have any prefect documentation on how we can take the most out of prefect-dask backend's parallelism ? Is this done only at the function-call level? Or does it auto-partition data during function like map(), etc? If there's a documentation that Illustrates this better, I'm happy with that! 🙂 Thank you!
:upvote: 1
@Robin for my case I had the minimum set at 1.
j

josh

07/17/2020, 1:33 PM
Hmm maybe some relevant resources:

https://www.youtube.com/watch?v=50tmq7vMOl0

https://docs.prefect.io/core/concepts/engine.html#executors https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html https://examples.dask.org/applications/prefect-etl.html Pretty much whenever you have tasks that can be executed in parallel prefect lets dask handle all of that. As an example if you have a flow with multiple branches that don’t rely on each other these branches will be executed in parallel using dask. Also taking advantage of mapping whenever possible will for sure utilize dask to the fullest because all of those tasks will exist at the same level of the flow and can be executed in parallel.
Another example: all of the
r
tasks can be executed in parallel once the upstream
t
tasks finish
b

bruno.corucho

07/17/2020, 1:49 PM
Thank you @josh!
r

Robin

07/17/2020, 5:45 PM
So we are currently running our dask-kubernetes cluster again and set the number of nodes to 10 on EKS. The flow is defined as follows:
@task
def say_hello():
    print("Hello, world! Got a secret for you :)")
    sleep(10)


with Flow("simple-dask-kube-flow") as flow:

    for i in range(30):
        say_hello()

flow.environment = DaskKubernetesEnvironment(
    min_workers=10, max_workers=100, labels=["k8s"]
)

flow.storage = Docker(registry_url="secreturl", prefect_version="master")


flow.register(project_name="eks_test_01")
So we would expect the dask cluster to spin up many workers and execute the tasks in parallel. And indeed, dask creates many workers (see attached). However, they are somehow cancelled and don’t run in parallel as can be seen on the gantt chart (see attached). Weirdly, in the beginning there were even 4 parallel runs, but afterwards only seriel runs… Any thoughts what could be the reasons?
j

josh

07/17/2020, 5:58 PM
🤔 Interesting, could you open an issue on the repo for this? I’m wondering if it’s Dask prioritizing things in a weird way
r

Robin

07/17/2020, 6:04 PM
Sure!
@Marvin open “dask-kubernetes cluster on EKS does not properly execute tasks in parallel”
@nicholas is it not intended to open issues like this via Marvin?
n

nicholas

07/17/2020, 6:17 PM
Hi @Robin Marvin is picky and only responds to Prefect people 😉
r

Robin

07/17/2020, 6:17 PM
OK, fair enough! 😄 I will open an issue in the old fassion then 🙂
n

nicholas

07/17/2020, 6:18 PM
@Marvin open "Dask-Kubernetes cluster on EKS does not properly execute tasks in parallel"
n

nicholas

07/17/2020, 6:18 PM
No need!
r

Robin

07/17/2020, 6:24 PM
Thanks! I am going to add our code for reproduction purpose 🙂
n

nicholas

07/17/2020, 6:24 PM
Perfect, thank you!
r

Robin

07/17/2020, 6:29 PM
Oh, on writing the issue I just noticed that I had still
prefect 0.12.3+44.g0b3ef62bc.dirty
. Might this cause the problem? Unfortunately, my colleague teard down the EKS cluster, we might tear it up again later, or tomorrow (it’s 20:30 over here in Germany)
j

josh

07/17/2020, 6:30 PM
Yeah that could be the cause of it!