Hi, i want replace local DaskTaskRunner to dask re...
# ask-community
r
Hi, i want replace local DaskTaskRunner to dask remote cluster, i have a simple code
Copy code
import asyncio
from typing import List

from prefect import flow, task
from prefect_dask import DaskTaskRunner


@task
async def test_task(numbers: List[int]):
    print(numbers)


@flow(task_runner=DaskTaskRunner('<tcp://10.10.10.200:8786>'))
async def my_parent():
    numbers = [i for i in range(2)]
    coros = test_task.map(numbers)
    await asyncio.gather(*[coros])


if __name__ == "__main__":
    asyncio.run(my_parent())
Does somebody tell me what i'm doing wrong ? Without mapping test_task, it works, mapping + local dask works too. but if i want use remote dask cluster and mapping i've receive this error:
Copy code
15:19:09.646 | INFO    | distributed.batched - Batched Comm Closed <TCP (closed) Client->Scheduler local=<tcp://10.61.14.2:60674> remote=<tcp://10.10.10.200:8786>>
Traceback (most recent call last):
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
             ^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError
15:19:09.659 | INFO    | Task run 'test_task-0' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: test_task-0-4fd7fc89242e4471877c70e0b40362e9-1
15:19:09.688 | INFO    | Task run 'test_task-1' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: test_task-1-1249f0856b0a4a138e0ca263f104cdea-1
15:19:09.744 | ERROR   | Flow run 'brass-worm' - Finished in state Failed('2/2 states failed.')
Traceback (most recent call last):
  File "/home/robin/.config/JetBrains/PyCharmCE2023.1/scratches/tmp_dask.py", line 21, in <module>
    asyncio.run(my_parent())
  File "/usr/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 182, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 375, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect_dask/task_runners.py", line 298, in wait
    return await future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/client.py", line 305, in _result
    raise exception
concurrent.futures._base.CancelledError: test_task-0-4fd7fc89242e4471877c70e0b40362e9-1

Process finished with exit code 1
any tip will be appreciated