Сергей Романкевич

    Сергей Романкевич

    1 year ago
    Hello friends! I'm trying to implement the scenario of the prefect's work with a dask cluster of 20 ~ 30 workers and it won't work. I'm asking for the best practice. I have the following tools: Debian server with a running prefect version 15.5 and an installed dask-2021.9.0 distributed-2021.9.0.  I run dask-scheduler on the Debian server and one of the dask-workers on another computer. Details are in my messages
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey, what are the versions of Dask on your executor? It seems like you may have a different version there. Could you move the traceback to the thread so we don’t crowd the main channel when you get the chance?
    Сергей Романкевич

    Сергей Романкевич

    1 year ago
    I launch the test Flow.
    from prefect.executors import DaskExecutor
    from prefect import task, Flow
    import datetime
    import random
    from time import sleep
    
    @task
    def inc(x):
        sleep(random.random() / 10)
        return x + 1
    @task
    def dec(x):
        sleep(random.random() / 10)
        return x - 1
    @task
    def add(x, y):
        sleep(random.random() / 10)
        return x + y
    @task(name="sum")
    def list_sum(arr):
        return sum(arr)
        
    with Flow("dask-example") as flow:
        incs = inc.map(x=range(100))
        decs = dec.map(x=range(100))
        adds = add.map(x=incs, y=decs)
        total = list_sum(adds)
    
    executor = DaskExecutor(address="<tcp://10.1.5.176:8786>")
    flow.run(executor=executor)
    And I get an error.
    rsv@srv-etl:~/flow$ /bin/python /home/rsv@comandor.local/flow/dask.py
    [2021-09-02 10:21:40+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
    [2021-09-02 10:21:40+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
    [2021-09-02 10:21:40+0700] ERROR - prefect.FlowRunner | Unexpected error: ImportError("cannot import name 'Client' from 'distributed' (/usr/local/lib/python3.7/dist-packages/distributed/__init__.py)")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/dist-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/dist-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/usr/lib/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/usr/local/lib/python3.7/dist-packages/prefect/executors/dask.py", line 201, in start
        from distributed import Client
    ImportError: cannot import name 'Client' from 'distributed' (/usr/local/lib/python3.7/dist-packages/distributed/__init__.py)
    [2021-09-02 10:21:40+0700] ERROR - prefect.dask-example | Unexpected error occured in FlowRunner: ImportError("cannot import name 'Client' from 'distributed' (/usr/local/lib/python3.7/dist-packages/distributed/__init__.py)")
    [2021-09-02 10:21:40+0700] ERROR - prefect.FlowRunner | Unexpected error: AttributeError("module 'dask' has no attribute 'config'")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/dist-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/dist-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/usr/lib/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/usr/local/lib/python3.7/dist-packages/prefect/executors/dask.py", line 201, in start
        from distributed import Client
      File "/usr/local/lib/python3.7/dist-packages/distributed/__init__.py", line 1, in <module>
        from . import config  # isort:skip; load distributed configuration first
      File "/usr/local/lib/python3.7/dist-packages/distributed/config.py", line 12, in <module>
        config = dask.config.config
    AttributeError: module 'dask' has no attribute 'config'
    [2021-09-02 10:21:40+0700] ERROR - prefect.dask-example | Unexpected error occured in FlowRunner: AttributeError("module 'dask' has no attribute 'config'")
    Kevin Kho

    Kevin Kho

    1 year ago
    Thanks for moving this!
    Сергей Романкевич

    Сергей Романкевич

    1 year ago
    @Kevin Kho The version dask-2021.9.0 and distributed-2021.9.0 there is a collision with pip package
    import dask.config
    dask.config.config
    Error
    rsv@srv-etl:~/flow$ /bin/python /home/rsv@comandor.local/flow/dask_conf.py
    [2021-09-07 21:48:55+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
    [2021-09-07 21:48:55+0700] ERROR - prefect.FlowRunner | Unexpected error: AttributeError("module 'dask' has no attribute 'config'")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/dist-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/dist-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/usr/lib/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/usr/local/lib/python3.7/dist-packages/prefect/executors/dask.py", line 201, in start
        from distributed import Client
      File "/usr/local/lib/python3.7/dist-packages/distributed/__init__.py", line 1, in <module>
        from . import config  # isort:skip; load distributed configuration first
      File "/usr/local/lib/python3.7/dist-packages/distributed/config.py", line 12, in <module>
        config = dask.config.config
    AttributeError: module 'dask' has no attribute 'config'
    [2021-09-07 21:48:55+0700] ERROR - prefect.dask-example | Unexpected error occured in FlowRunner: AttributeError("module 'dask' has no attribute 'config'")
    Traceback (most recent call last):
      File "/home/rsv@comandor.local/flow/dask_conf.py", line 9, in <module>
        import dask.config
    Kevin Kho

    Kevin Kho

    1 year ago
    This issue says it’s multiple installations of Dask. What is your Dask setup on the executor (cluster)?
    Сергей Романкевич

    Сергей Романкевич

    1 year ago
    First was dask-scheduler second was dask-worker
    Kevin Kho

    Kevin Kho

    1 year ago
    Sorry, I meant to say what are the versions of Dask for each of those. I don’t thjnk I’m seeing it
    Сергей Романкевич

    Сергей Романкевич

    1 year ago
    @Kevin Kho you were right! I had a few versions of dask installed. thank you so much for the right question! /home/romankevich sergej@comandor.local/.local/lib/python3.7/site-packages/dask/ /usr/local/lib/python3.7/dist-packages/dask/
    Kevin Kho

    Kevin Kho

    1 year ago
    Nice!