Hello friends! I'm trying to implement the scenari...
# ask-community
с
Hello friends! I'm trying to implement the scenario of the prefect's work with a dask cluster of 20 ~ 30 workers and it won't work. I'm asking for the best practice. I have the following tools: Debian server with a running prefect version 15.5 and an installed dask-2021.9.0 distributed-2021.9.0.  I run dask-scheduler on the Debian server and one of the dask-workers on another computer. Details are in my messages
k
Hey, what are the versions of Dask on your executor? It seems like you may have a different version there. Could you move the traceback to the thread so we don’t crowd the main channel when you get the chance?
с
I launch the test Flow.
Copy code
from prefect.executors import DaskExecutor
from prefect import task, Flow
import datetime
import random
from time import sleep

@task
def inc(x):
    sleep(random.random() / 10)
    return x + 1
@task
def dec(x):
    sleep(random.random() / 10)
    return x - 1
@task
def add(x, y):
    sleep(random.random() / 10)
    return x + y
@task(name="sum")
def list_sum(arr):
    return sum(arr)
    
with Flow("dask-example") as flow:
    incs = inc.map(x=range(100))
    decs = dec.map(x=range(100))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)

executor = DaskExecutor(address="<tcp://10.1.5.176:8786>")
flow.run(executor=executor)
And I get an error.
Copy code
rsv@srv-etl:~/flow$ /bin/python /home/rsv@comandor.local/flow/dask.py
[2021-09-02 10:21:40+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2021-09-02 10:21:40+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2021-09-02 10:21:40+0700] ERROR - prefect.FlowRunner | Unexpected error: ImportError("cannot import name 'Client' from 'distributed' (/usr/local/lib/python3.7/dist-packages/distributed/__init__.py)")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.7/dist-packages/prefect/executors/dask.py", line 201, in start
    from distributed import Client
ImportError: cannot import name 'Client' from 'distributed' (/usr/local/lib/python3.7/dist-packages/distributed/__init__.py)
[2021-09-02 10:21:40+0700] ERROR - prefect.dask-example | Unexpected error occured in FlowRunner: ImportError("cannot import name 'Client' from 'distributed' (/usr/local/lib/python3.7/dist-packages/distributed/__init__.py)")
[2021-09-02 10:21:40+0700] ERROR - prefect.FlowRunner | Unexpected error: AttributeError("module 'dask' has no attribute 'config'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.7/dist-packages/prefect/executors/dask.py", line 201, in start
    from distributed import Client
  File "/usr/local/lib/python3.7/dist-packages/distributed/__init__.py", line 1, in <module>
    from . import config  # isort:skip; load distributed configuration first
  File "/usr/local/lib/python3.7/dist-packages/distributed/config.py", line 12, in <module>
    config = dask.config.config
AttributeError: module 'dask' has no attribute 'config'
[2021-09-02 10:21:40+0700] ERROR - prefect.dask-example | Unexpected error occured in FlowRunner: AttributeError("module 'dask' has no attribute 'config'")
k
Thanks for moving this!
с
@Kevin Kho The version dask-2021.9.0 and distributed-2021.9.0 there is a collision with pip package
Copy code
import dask.config
dask.config.config
Error
Copy code
rsv@srv-etl:~/flow$ /bin/python /home/rsv@comandor.local/flow/dask_conf.py
[2021-09-07 21:48:55+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2021-09-07 21:48:55+0700] ERROR - prefect.FlowRunner | Unexpected error: AttributeError("module 'dask' has no attribute 'config'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
    with self.check_for_cancellation(), executor.start():
  File "/usr/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.7/dist-packages/prefect/executors/dask.py", line 201, in start
    from distributed import Client
  File "/usr/local/lib/python3.7/dist-packages/distributed/__init__.py", line 1, in <module>
    from . import config  # isort:skip; load distributed configuration first
  File "/usr/local/lib/python3.7/dist-packages/distributed/config.py", line 12, in <module>
    config = dask.config.config
AttributeError: module 'dask' has no attribute 'config'
[2021-09-07 21:48:55+0700] ERROR - prefect.dask-example | Unexpected error occured in FlowRunner: AttributeError("module 'dask' has no attribute 'config'")
Traceback (most recent call last):
  File "/home/rsv@comandor.local/flow/dask_conf.py", line 9, in <module>
    import dask.config
k
This issue says it’s multiple installations of Dask. What is your Dask setup on the executor (cluster)?
с
First was dask-scheduler second was dask-worker
k
Sorry, I meant to say what are the versions of Dask for each of those. I don’t thjnk I’m seeing it
с
@Kevin Kho you were right! I had a few versions of dask installed. thank you so much for the right question! /home/romankevich sergej@comandor.local/.local/lib/python3.7/site-packages/dask/ /usr/local/lib/python3.7/dist-packages/dask/
👍 1
k
Nice!