Chris L.
08/10/2022, 1:24 PMworker_client
context manager doesn't seem to work either and raises a "no workers found" error, despite the fact that I can access the Dask dashboard in localhost showing Prefect 2.0 tasks.
Below is a small reproducible example. You can see that in the screenshot that none of the inc
double
add
delayed function calls are picked up in the dashboard.
Wondering if there is some generic way for me to access the Dask task runner's cluster within tasks in Prefect 2.0?
import dask
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def parallel_sum(data):
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
time.sleep(5)
return x + y
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output).compute()
time.sleep(30)
return total
@flow(task_runner=DaskTaskRunner())
def run_parallel_sum(data):
parallel_sum.submit(data)
if __name__ == "__main__":
run_parallel_sum(list(range(1000)))
import dask
import time
from prefect import Flow, task, Parameter
from prefect.executors import DaskExecutor
@task
def parallel_sum(data):
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
time.sleep(5)
return x + y
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output).compute()
time.sleep(30)
return total
with Flow("run_parallel_sum", executor=DaskExecutor()) as flow:
data = Parameter("data", default=list(range(1000)))
parallel_sum(data)
if __name__ == "__main__":
flow.run()
Zheyuan
08/11/2022, 4:44 AM@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4}))
def run_parallel_sum(data):
parallel_sum.submit(data)
Chris L.
08/11/2022, 9:40 AM