https://prefect.io logo
Title
k

Kirill Egorov

05/15/2023, 3:42 PM
Hi, I use Dask executor on relatively large flow on local agent and over last few days after hour or so it stops updating the status. The flow is in state Running, but there aren't any running tasks. Some are pending, but no updates in logs. Has anyone experience that? Is it possible there is some keepalive timeout setting I need to update?
b

Bianca Hoch

05/15/2023, 7:22 PM
Hey Kirill, what you're experiencing may be related to this issue. I believe there were some fixes that have since been released to address this. What version of prefect are you using?
k

Kirill Egorov

05/15/2023, 7:25 PM
Hi, thanks for the link. I have tried it on few versions from 2.10.4 to 2.10.8. How I can get this error message in the example?
One of the things I have here is there is not location I can pick up error log. Is it Dask, agent or flow
b

Bianca Hoch

05/15/2023, 8:00 PM
You're referring to the error that contains:
await getter
asyncio.exceptions.CancelledError
correct? I believe that is present in the flow run logs.
Although, it is worth it to check and see if anything is surfaced in the agent logs as well.
k

Kirill Egorov

05/15/2023, 8:01 PM
To be honest I'd love to get error at least somewhere. I have checked agent logs, but I can't see any information from the Dask worker
b

Bianca Hoch

05/15/2023, 8:01 PM
Any chance you have DEBUG level logs enabled?
k

Kirill Egorov

05/15/2023, 8:02 PM
You mean for the agent itself?
I can't see any errors. Do you think it should be any errors on the Dask side?
b

Bianca Hoch

05/15/2023, 10:07 PM
You mean for the agent itself?
Correct, by setting
prefect config set PREFECT_LOGGING_LEVEL=DEBUG
I can't see any errors. Do you think it should be any errors on the Dask side?
I'd say It's worth looking into the dask worker logs if you haven't already.
k

Kirill Egorov

05/16/2023, 5:41 AM
I haven’t, as I’m using local Dask server. But I’ll find them
This is the only error we've managed to get. Not sure whether this is relevant or not
15:45:11.383 | DEBUG   | prefect.client - Encountered retryable exception during request. Another attempt will be made in 1.9827914098585266s. This is attempt 1/6.
Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 224, in process_input
    func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.RECV_PING: 14>)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 112, in handle_async_request
    status, headers = await self._receive_response(
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 229, in _receive_response
    event = await self._receive_stream_event(request, stream_id)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 260, in _receive_stream_event
    await self._receive_events(request, stream_id)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 281, in _receive_events
    events = await self._read_incoming_data(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 343, in _read_incoming_data
    events: typing.List[h2.events.Event] = self._h2_state.receive_data(data)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 1463, in receive_data
    events.extend(self._receive_frame(frame))
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 1487, in _receive_frame
    frames, events = self._frame_dispatch_table[frame.__class__](frame)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 1760, in _receive_ping_frame
    events = self.state_machine.process_input(
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 228, in process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 142, in handle_async_request
    raise LocalProtocolError(exc)  # pragma: nocover
httpcore.LocalProtocolError: Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/prefect/client/base.py", line 194, in _send_with_retry
    response = await request()
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.LocalProtocolError: Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED
15:45:13.382 | DEBUG   | prefect.client - Encountered retryable exception during request. Another attempt will be made in 4.562970883434397s. This is attempt 2/6.
Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 224, in process_input
    func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 106, in handle_async_request
    await self._send_request_headers(request=request, stream_id=stream_id)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 203, in _send_request_headers
    self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 766, in send_headers
    self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 228, in process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpcore/_async/http2.py", line 142, in handle_async_request
    raise LocalProtocolError(exc)  # pragma: nocover
httpcore.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/prefect/client/base.py", line 194, in _send_with_retry
    response = await request()
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
15:45:16.820 | DEBUG   | prefect.agent - Checking for scheduled flow runs...
15:45:16.823 | DEBUG   | prefect.client - Encountered retryable exception during request. Another attempt will be made in 1.8359333292919553s. This is attempt 1/6.
Traceback (most recent call last):
  File "/home/user/Projects/SilverTide/env/lib/python3.10/site-packages/h2/connection.py", line 224, in process_input
    func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)
b

Bianca Hoch

05/16/2023, 2:12 PM
Ah, that error stems from an issue with an upstream library. We have a workaround for that error in particular.
prefect config set PREFECT_API_ENABLE_HTTP2=False
<- try setting this variable where the agent lives to see if that helps
k

Kirill Egorov

05/16/2023, 2:13 PM
Yes, we're testing this now. thanks. Though this is our new test on dev Linux machine rather then usual WSL as we run in production. Will let you know if can catch something else
Hi Bianca, we still investigating this issue. After turning off HTTP2 we still experience that flow stop progressing through tasks. One thing we've noticed that some of that tasks on Dask side still in progress while flow marks them as completed. Not sure we can build a reproducible example, but maybe you have some thoughts on it.
Maybe it worth setup a call with someone from your side to show what we have
Hi Bianca, hope you had a nice weekend. We've been testing ConcurentTask Runner and the main challenge we've encountered is concurrency limit. Correct me if I'm wrong but at this stage the only way to manage the limit is to add tag to all tasks we want to set the limit. As we have a lot of DB writes the only only way to keep it from overloading is to add tags to all of them. Is there any way to set a flow level concurrency limit?