Hi team, I'm running my flow with: `@flow(name="m...
# ask-community
a
Hi team, I'm running my flow with:
@flow(name="my_flow", task_runner=DaskTaskRunner())
The tasks run and provide the expected output, but for some reason I get this error:
Copy code
raise StreamClosedError()
tornado.iostream.StreamClosedError: Stream is closed
    convert_stream_closed_error(self, e)
  File "/Users/user/.pyenv/versions/3.11.1/envs/test-p2-env/lib/python3.11/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Scheduler connection to worker local=<tcp://127.0.0.1:60686> remote=<tcp://127.0.0.1:60714>>: Stream is closed
{"message": "Batched Comm Closed <TCP (closed) Scheduler connection to worker local=<tcp://127.0.0.1:60686> remote=<tcp://127.0.0.1:60714>>", "exc_info": "Traceback (most recent call last):\n  File \"/Users/user/.pyenv/versions/3.11.1/envs/test-p2-env/lib/python3.11/site-packages/distributed/comm/tcp.py\", line 297, in write\n    raise StreamClosedError()\ntornado.iostream.StreamClosedError: Stream is closed\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File \"/Users/user/.pyenv/versions/3.11.1/envs/test-p2-env/lib/python3.11/site-packages/distributed/batched.py\", line 115, in _background_send\n    nbytes = yield coro\n             ^^^^^^^^^^\n  File \"/Users/user/.pyenv/versions/3.11.1/envs/test-p2-env/lib/python3.11/site-packages/tornado/gen.py\", line 766, in run\n    value = future.result()\n            ^^^^^^^^^^^^^^^\n  File \"/Users/user/.pyenv/versions/3.11.1/envs/test-p2-env/lib/python3.11/site-packages/distributed/comm/tcp.py\", line 307, in write\n    convert_stream_closed_error(self, e)\n  File \"/Users/user/.pyenv/versions/3.11.1/envs/test-p2-env/lib/python3.11/site-packages/distributed/comm/tcp.py\", line 142, in convert_stream_closed_error\n    raise CommClosedError(f\"in {obj}: {exc}\") from exc\ndistributed.comm.core.CommClosedError: in <TCP (closed) Scheduler connection to worker local=<tcp://127.0.0.1:60686> remote=<tcp://127.0.0.1:60714>>: Stream is closed", "time": "10:47:30", "level": "INFO"}
anyone encounter something similar?
k
hi, Aviv! when you look at the Dask dashboard, do you see the workers dying? this is a normal behavior from Dask when the computation is ending; Dask kills all workers and ends the connection because the flow is already ending ;D
a
Hi Kamilly! Thanks for answering 🙂 yes the flow completes successfully it just seemed weird that dask will raise exception instead of killing the workers in a more "peaceful way"