https://prefect.io logo
p

Pedro Martins

01/06/2021, 6:17 PM
Hey everyone! I'd like to have some suggestion on how to make tasks to run in parallel using dask cluster. I'm following this tutorial https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html Here is the flow I want to run. Note that I'm connecting to dask-cluster on my k8s cluster.
Copy code
custom_confs = {
    "run_config": KubernetesRun(
        image="drtools/prefect:dask-test", 
        image_pull_secrets=["regcred"], 
    ),   
    "storage": S3(bucket="dr-prefect"),
    "executor": DaskExecutor(address="dask-scheduler.dask.svc.cluster.local:8786")
} 

with Flow("dask-example", **custom_confs) as flow:
    incs = inc.map(x=range(100))
    decs = dec.map(x=range(100))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)
Ideally I want to have
incs
and
decs
running in parallel in different workers but they are running asynchronously as you can see in the image below. I tried with 3, 5 and 10 dask-workers but the tasks doesn't run in parallel. What do you suggest?
j

josh

01/06/2021, 6:25 PM
Hi @Pedro Martins, quick question, which version of Prefect is your agent running? I want to make sure it is using the run config (if it’s below 0.14.0 then there is a chance that it is not respecting it)
p

Pedro Martins

01/06/2021, 6:30 PM
Hey @josh! It is 0.14.1.
Copy code
$ kubectl exec -ti prefect-server-agent-654b7857f5-g2qt8 -- python -c "import prefect; print(prefect.__version__)"
0.14.1
j

josh

01/06/2021, 6:40 PM
Could you add to your KubernetesRun:
Copy code
env = {
  "PREFECT__LOG__LEVEL": "DEBUG"
}
and then look for a line in your run’s logs that mentions something along the lines of “Using executor type …”
p

Pedro Martins

01/06/2021, 7:25 PM
It's
PREFECT__LOGGING__LEVEL
🙂
Copy code
[2021-01-06 19:23:48+0000] INFO - prefect.S3 | Downloading dask-example/2021-01-06t19-23-40-187375-00-00 from dr-prefect
[2021-01-06 19:23:49+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'dask-example'
[2021-01-06 19:23:49+0000] DEBUG - prefect.CloudFlowRunner | Using executor type DaskExecutor
[2021-01-06 19:23:49+0000] DEBUG - prefect.CloudFlowRunner | Flow 'dask-example': Handling state change from Scheduled to Running
[2021-01-06 19:24:05+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-01-06 19:24:20+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-01-06 19:24:26+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-01-06 19:24:26+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-01-06 19:24:26+0000] DEBUG - prefect.CloudFlowRunner | Flow 'dask-example': Handling state change from Running to Success
/usr/local/lib/python3.6/site-packages/distributed/client.py:1129: VersionMismatchWarning: Mismatched versions found

+-------------+----------------+---------------+---------------+
| Package     | client         | scheduler     | workers       |
+-------------+----------------+---------------+---------------+
| blosc       | None           | 1.9.2         | 1.9.2         |
| distributed | 2.30.0         | 2.30.0        | 2.30.1        |
| lz4         | None           | 3.1.0         | 3.1.1         |
| msgpack     | 1.0.2          | 1.0.0         | 1.0.0         |
| numpy       | None           | 1.18.1        | 1.18.1        |
| python      | 3.6.12.final.0 | 3.8.0.final.0 | 3.7.0.final.0 |
| tornado     | 6.1            | 6.0.4         | 6.1           |
+-------------+----------------+---------------+---------------+
Notes:
-  msgpack: Variation is ok, as long as everything is above 0.6
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
Do you think the version mismatch could be the reason?
j

josh

01/06/2021, 7:26 PM
Oh yep my bad that is the correct env var 😅 Hmm I don’t think the version mismatch would cause the issue but it doesn’t hurt to attempt to alleviate it. There could be a small chance that Dask doesn’t think this work needs to be parallelized (but that seems unlikely). What happens if you put a
time.sleep(1)
or something inside of your inc/dec tasks to slow them down?
a

Arnulfo Soriano

01/06/2021, 7:31 PM
I have the same issue where the tasks are not being parallely executed on multiple pods, i verified im using prefect v0.14.0. The "environment" version of the below code worked just fine, It will spawn multiple pods and run parallely but not with run run_config. Here is the code im using:
Copy code
import prefect
from prefect import task, Flow, Parameter
from prefect.environments.storage import Docker
from prefect.run_configs import KubernetesRun
import time
import requests
from prefect.executors import DaskExecutor
from dask_kubernetes import KubeCluster, make_pod_spec

@task
def hello_task(delay):
    time.sleep(delay)
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello, Kubernetes!")


@task
def get_value():
    time.sleep(10)
    return 5


@task
def output_value(value):
    print(value)

def register_flow():
    with Flow("kubernetesRun2") as flow:
        delay = Parameter('delay', default=30)
        value = get_value()
        output_value(value)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)
        hello_task(delay)

    flow.storage = Docker(image_name="prefect-sample-worker",
                          image_tag="demo_0.0.34",
                          local_image=True,
                          base_image='test/base:0.0.1',
                          registry_url="test")
    flow.storage.add_flow(flow)
    flow.storage.build(push=False)
    print("Registering containerdemo")
    flow.run_config = KubernetesRun(labels=["TEST"], env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
    flow.executor = DaskExecutor(cluster_class=lambda: KubeCluster(make_pod_spec(image="test/prefect-sample-worker:demo_0.0.34")), adapt_kwargs={"minimum": 2, "maximum": 5},)
    flow.register(project_name="test", build=False)

if __name__ == "__main__":
    register_flow()
debug logs:
Copy code
[2021-01-06 19:29:15+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'kubernetesRun2'
[2021-01-06 19:29:15+0000] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
[2021-01-06 19:29:15+0000] DEBUG - prefect.CloudFlowRunner | Flow 'kubernetesRun2': Handling state change from Scheduled to Running
[2021-01-06 19:29:15+0000] INFO - prefect.CloudTaskRunner | Task 'get_value': Starting task run...
[2021-01-06 19:29:15+0000] DEBUG - prefect.CloudTaskRunner | Task 'get_value': Handling state change from Pending to Running
[2021-01-06 19:29:15+0000] DEBUG - prefect.CloudTaskRunner | Task 'get_value': Calling task.run() method...
[2021-01-06 19:29:25+0000] DEBUG - prefect.CloudTaskRunner | Task 'get_value': Handling state change from Running to Success
[2021-01-06 19:29:25+0000] INFO - prefect.CloudTaskRunner | Task 'get_value': Finished task run for task with final state: 'Success'
[2021-01-06 19:29:26+0000] INFO - prefect.CloudTaskRunner | Task 'delay': Starting task run...
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'delay': Handling state change from Pending to Running
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'delay': Calling task.run() method...
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'delay': Handling state change from Running to Success
[2021-01-06 19:29:26+0000] INFO - prefect.CloudTaskRunner | Task 'delay': Finished task run for task with final state: 'Success'
[2021-01-06 19:29:26+0000] INFO - prefect.CloudTaskRunner | Task 'hello_task': Starting task run...
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Handling state change from Pending to Running
[2021-01-06 19:29:26+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Calling task.run() method...
[2021-01-06 19:29:30+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-01-06 19:29:36+0000] INFO - prefect.hello_task | Hello, Kubernetes!
[2021-01-06 19:29:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Handling state change from Running to Success
[2021-01-06 19:29:36+0000] INFO - prefect.CloudTaskRunner | Task 'hello_task': Finished task run for task with final state: 'Success'
[2021-01-06 19:29:36+0000] INFO - prefect.CloudTaskRunner | Task 'hello_task': Starting task run...
[2021-01-06 19:29:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Handling state change from Pending to Running
[2021-01-06 19:29:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'hello_task': Calling task.run() method...
j

josh

01/06/2021, 7:32 PM
@Arnulfo Soriano your logs state
Using executor type LocalExecutor
which means it isn’t using the DaskExecutor you set on your flow. Are both your flow version and your agent version 0.14+?
p

Pedro Martins

01/06/2021, 7:37 PM
@josh I fixed the version mismatch and indeed didn't change anything. All the functions contains a sleep call on it
sleep(random.random() / 10)
a

Arnulfo Soriano

01/06/2021, 7:40 PM
@josh Yea, i just verified and my version is 0.14.0 across all my prefect
j

josh

01/06/2021, 7:41 PM
Hmm could you both open up an issue on the repo with this information? It’ll be easier to triage there compared to on Slack 🙂
👍 1
p

Pedro Martins

01/06/2021, 7:41 PM
I thought it could be something with the UI but dask task stream confirm they're running asynchronously
j

josh

01/06/2021, 7:43 PM
Oh that’s interesting @Pedro Martins, so dask shows parallelism but not the Prefect UI?
a

Arnulfo Soriano

01/06/2021, 7:43 PM
@Pedro Martins yea, in my case i can see kubernetes and i don't see any pods being spawn to execute the tasks parallely other than the 1 job pod.
p

Pedro Martins

01/06/2021, 7:47 PM
@josh Well, kind of! it shows parallelism in a single task - see that multiple
dec
is computed at same time. However, I was expecting
dec
and
inc
to run parallel - smth like the green in the middle of the purple.
a

Arnulfo Soriano

01/06/2021, 7:49 PM
oh damn, i got it working. I just moved
Copy code
flow.run_config = KubernetesRun(labels=["TEST"], env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
    flow.executor = DaskExecutor(cluster_class=lambda: KubeCluster(make_pod_spec(image="test/prefect-sample-worker:demo_0.0.34")), adapt_kwargs={"minimum": 2, "maximum": 5},)
on top of flow.storage...
j

josh

01/06/2021, 7:51 PM
Ah, I didn’t notice this in your code! You were building your flow’s storage before you were adding the run config/executor to it. Therefore your flow was being stored without all of that configuration. Nice catch 🙂
a

Arnulfo Soriano

01/06/2021, 7:51 PM
@Pedro Martins sorry, i meant the dask pods. before i didn't see any dask pod and everytihng executed on the job pod. But now im able to see multiple dask pods
@josh ah ok, that makes sense. Thank you
p

Pedro Martins

01/06/2021, 7:57 PM
That looks great @Arnulfo Soriano! I'm trying to replicate the way you did.
👍 1
Didn't get much luck yet. 😕
Copy code
with Flow("dask-example") as flow:
    incs = inc.map(x=range(100))
    decs = dec.map(x=range(100))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)

custom_confs = {
    "run_config": KubernetesRun(
        image="drtools/prefect:dask-test", 
        image_pull_secrets=["regcred"], 
        env = {
          "PREFECT__LOGGING__LEVEL": "DEBUG"
        }
    ),   
    "executor": DaskExecutor(address="dask-scheduler.dask.svc.cluster.local:8786"),
    "storage": S3(bucket="dr-prefect"),
} 

flow.run_config = custom_confs["run_config"]
flow.executor = custom_confs["executor"]
flow.storage = custom_confs["storage"]
a

Arnulfo Soriano

01/06/2021, 8:10 PM
our cluster setup might be different. In my case i have this cluster on kubernetes:
w

wiretrack

01/07/2021, 2:34 AM
@Pedro Martins isn’t it because of the number (range=100)? meaning it can only parallelise so much, but not 100? I just ran a test with the following code:
Copy code
from prefect import task
from prefect import Flow
from prefect.environments import LocalEnvironment
from prefect.executors import DaskExecutor
import time


@task
def inc(x):
    time.sleep(10)
    return x + 1

@task
def dec(x):
    time.sleep(10)
    return x - 1

@task
def add(x, y):
    time.sleep(10)
    return x + y


executor = DaskExecutor(address='<tcp://192.168.68.103:8786>')
environment = LocalEnvironment(executor)

with Flow("dask-example", environment=environment) as flow:
    incs = inc.map(x=range(3))
    decs = dec.map(x=range(3))
    adds = add.map(x=incs, y=decs)

flow.register(project_name='ETL')
This is with a local
dask-scheduler
and 3
dask-workers
running locally. I don’t know if can be a proper way to test your use case, but anyways…