Hello there, question about Prefect 2.0 and Dask. ...
# data-tricks-and-tips
c
Hello there, question about Prefect 2.0 and Dask. In Prefect 1.0, any Dask delayed computation within a Prefect task are "picked" up by the same Dask cluster that executes the Prefect flow (with DaskExecutor). However, in Prefect 2.0, I don't think this is the case. Using the dask distributed
worker_client
context manager doesn't seem to work either and raises a "no workers found" error, despite the fact that I can access the Dask dashboard in localhost showing Prefect 2.0 tasks. Below is a small reproducible example. You can see that in the screenshot that none of the
inc
double
add
delayed function calls are picked up in the dashboard. Wondering if there is some generic way for me to access the Dask task runner's cluster within tasks in Prefect 2.0?
Copy code
import dask
import time

from prefect import flow, task
from prefect_dask import DaskTaskRunner


@task
def parallel_sum(data):
    def inc(x):
        return x + 1

    def double(x):
        return x * 2

    def add(x, y):
        time.sleep(5)
        return x + y

    output = []
    for x in data:
        a = dask.delayed(inc)(x)
        b = dask.delayed(double)(x)
        c = dask.delayed(add)(a, b)
        output.append(c)

    total = dask.delayed(sum)(output).compute()
    time.sleep(30)
    return total


@flow(task_runner=DaskTaskRunner())
def run_parallel_sum(data):
    parallel_sum.submit(data)


if __name__ == "__main__":
    run_parallel_sum(list(range(1000)))
By contrast in prefect v1.2.3
Copy code
import dask
import time

from prefect import Flow, task, Parameter
from prefect.executors import DaskExecutor


@task
def parallel_sum(data):
    def inc(x):
        return x + 1

    def double(x):
        return x * 2

    def add(x, y):
        time.sleep(5)
        return x + y

    output = []
    for x in data:
        a = dask.delayed(inc)(x)
        b = dask.delayed(double)(x)
        c = dask.delayed(add)(a, b)
        output.append(c)

    total = dask.delayed(sum)(output).compute()
    time.sleep(30)
    return total


with Flow("run_parallel_sum", executor=DaskExecutor()) as flow:
    
    data = Parameter("data", default=list(range(1000)))
    parallel_sum(data)


if __name__ == "__main__":
    flow.run()
z
For Prefect2, you may only need to modify the parameters of DaskTaskRunner to dynamically create a Worker.
Copy code
@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4}))
def run_parallel_sum(data):
    parallel_sum.submit(data)
c
Hi @Zheyuan thanks for the response, unfortunately I don't think it address my issue as my question relates to connecting to the Prefect task's worker from within the task.
👀 1