Robert Kowalski
06/22/2023, 1:22 PMimport asyncio
from typing import List
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
async def test_task(numbers: List[int]):
print(numbers)
@flow(task_runner=DaskTaskRunner('<tcp://10.10.10.200:8786>'))
async def my_parent():
numbers = [i for i in range(2)]
coros = test_task.map(numbers)
await asyncio.gather(*[coros])
if __name__ == "__main__":
asyncio.run(my_parent())
Does somebody tell me what i'm doing wrong ? Without mapping test_task, it works, mapping + local dask works too. but if i want use remote dask cluster and mapping i've receive this error:
15:19:09.646 | INFO | distributed.batched - Batched Comm Closed <TCP (closed) Client->Scheduler local=<tcp://10.61.14.2:60674> remote=<tcp://10.10.10.200:8786>>
Traceback (most recent call last):
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/tornado/gen.py", line 767, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/comm/tcp.py", line 269, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
15:19:09.659 | INFO | Task run 'test_task-0' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: test_task-0-4fd7fc89242e4471877c70e0b40362e9-1
15:19:09.688 | INFO | Task run 'test_task-1' - Crash detected! Execution was interrupted by an unexpected exception: CancelledError: test_task-1-1249f0856b0a4a138e0ca263f104cdea-1
15:19:09.744 | ERROR | Flow run 'brass-worm' - Finished in state Failed('2/2 states failed.')
Traceback (most recent call last):
File "/home/robin/.config/JetBrains/PyCharmCE2023.1/scratches/tmp_dask.py", line 21, in <module>
asyncio.run(my_parent())
File "/usr/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 182, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 375, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/prefect_dask/task_runners.py", line 298, in wait
return await future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/robin/.cache/pypoetry/virtualenvs/algoseek-adjustments-inserter-myRRPE6N-py3.11/lib/python3.11/site-packages/distributed/client.py", line 305, in _result
raise exception
concurrent.futures._base.CancelledError: test_task-0-4fd7fc89242e4471877c70e0b40362e9-1
Process finished with exit code 1
any tip will be appreciated