https://prefect.io logo
Title
f

Faheem Khan

11/06/2022, 11:34 PM
prefect 2.6.6 local server: I am using run_deployment() to run two flows i.e. Flow A and Flow B. second flow starts after first flow finishes. Whenever I triggers the flows from run_deployment() flow. the flow fails with an error although the Flow A completes without an error.
👀 1
✅ 1
Loading flow for deployment 'combine_flows'... 07:21:11 AM DEBUG Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently... 07:21:12 AM DEBUG Executing flow 'combine-flows' for flow run 'active-cormorant'... 07:21:12 AM DEBUG Executing combine_flows() 07:21:12 AM ERROR Encountered exception during execution: Traceback (most recent call last): File "/usr/local/lib/python3.10/asyncio/selector_events.py", line 854, in _read_ready__data_received data = self._sock.recv(self.max_size) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions yield File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 33, in read return await self._stream.receive(max_bytes=max_bytes) File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive raise self._protocol.exception anyio.BrokenResourceError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions yield File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request resp = await self._pool.handle_async_request(req) File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request raise exc File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request response = await connection.handle_async_request(request) File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request return await self._connection.handle_async_request(request) File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request raise exc File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request ) = await self._receive_response_headers(**kwargs) File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers event = await self._receive_event(timeout=timeout) File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 177, in _receive_event data = await self._network_stream.read( File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 30, in read with map_exceptions(exc_map): File "/usr/local/lib/python3.10/contextlib.py", line 153, in exit self.gen.throw(typ, value, traceback) File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions raise to_exc(exc) httpcore.ReadError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run result = await run_sync(flow_call) File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread return await future File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run result = context.run(func, *args) File "/opt/prefect/src/combine_flows.py", line 25, in combine_flows run_deployment(name="Main downloader/downloader_0.6.2") File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 197, in coroutine_wrapper return run_async_from_worker_thread(async_fn, *args, **kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread return anyio.from_thread.run(call) File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run return asynclib.run_async_from_thread(func, *args) File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread return f.result() File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client return await fn(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 127, in run_deployment flow_run = await client.read_flow_run(flow_run_id) File "/usr/local/lib/python3.10/site-packages/prefect/client/orion.py", line 1439, in read_flow_run response = await self._client.get(f"/flow_runs/{flow_run_id}") File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get return await self.request( File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send await super().send(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send response = await self._send_handling_auth( File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth response = await self._send_handling_redirects( File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects response = await self._send_single_request(request) File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request response = await transport.handle_async_request(request) File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request with map_httpcore_exceptions(): File "/usr/local/lib/python3.10/contextlib.py", line 153, in exit self.gen.throw(typ, value, traceback) File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ReadError
c

Carlo

11/07/2022, 2:17 PM
We are seeing the same http error (reset peer), on the new server version. Didn't see on 2.6.4
b

Bianca Hoch

11/07/2022, 5:46 PM
Hi Faheem, thank you for raising this. I'll reach out to the team to see what could be going on here. Would you mind sharing a few example IDs of the the affected flow runs? Also, would you mind sharing a minimal reproducible example of your code, and provide a description of how you created the deployment?
Hello team (@Carlo @Faheem Khan), just a heads up that we may have a potential solution for what could be happening here.
f

Faheem Khan

11/07/2022, 11:10 PM
@Bianca Hoch the error appears in agent logs even if I am not running any flows, but it affects running flows. below is agent logs no flow running....
prefect-agent_1   | 23:07:49.438 | DEBUG   | prefect.agent - Checking for flow runs...
prefect-agent_1 | 23:07:49.447 | ERROR | prefect.agent - prefect-agent_1 | Traceback (most recent call last): prefect-agent_1 | File "/usr/local/lib/python3.10/asyncio/selector_events.py", line 854, in _read_ready__data_received prefect-agent_1 | data = self._sock.recv(self.max_size) prefect-agent_1 | ConnectionResetError: [Errno 104] Connection reset by peer prefect-agent_1 | prefect-agent_1 | The above exception was the direct cause of the following exception: prefect-agent_1 | prefect-agent_1 | Traceback (most recent call last): prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions prefect-agent_1 | yield prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 33, in read prefect-agent_1 | return await self._stream.receive(max_bytes=max_bytes) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive prefect-agent_1 | raise self._protocol.exception prefect-agent_1 | anyio.BrokenResourceError prefect-agent_1 | prefect-agent_1 | During handling of the above exception, another exception occurred: prefect-agent_1 | prefect-agent_1 | Traceback (most recent call last): prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions prefect-agent_1 | yield prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request prefect-agent_1 | resp = await self._pool.handle_async_request(req) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request prefect-agent_1 | raise exc prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request prefect-agent_1 | response = await connection.handle_async_request(request) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request prefect-agent_1 | return await self._connection.handle_async_request(request) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request prefect-agent_1 | raise exc prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request prefect-agent_1 | ) = await self._receive_response_headers(**kwargs) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers prefect-agent_1 | event = await self._receive_event(timeout=timeout) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 177, in _receive_event prefect-agent_1 | data = await self._network_stream.read( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 30, in read prefect-agent_1 | with map_exceptions(exc_map): prefect-agent_1 | File "/usr/local/lib/python3.10/contextlib.py", line 153, in exit prefect-agent_1 | self.gen.throw(typ, value, traceback) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions prefect-agent_1 | raise to_exc(exc) prefect-agent_1 | httpcore.ReadError prefect-agent_1 | prefect-agent_1 | The above exception was the direct cause of the following exception: prefect-agent_1 | prefect-agent_1 | Traceback (most recent call last): prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/prefect/agent.py", line 154, in get_and_submit_flow_runs prefect-agent_1 | queue_runs = await self.client.get_runs_in_work_queue( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/prefect/client/orion.py", line 759, in get_runs_in_work_queue prefect-agent_1 | response = await self._client.post( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post prefect-agent_1 | return await self.request( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request prefect-agent_1 | return await self.send(request, auth=auth, follow_redirects=follow_redirects) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send prefect-agent_1 | await super().send(*args, **kwargs) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send prefect-agent_1 | response = await self._send_handling_auth( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth prefect-agent_1 | response = await self._send_handling_redirects( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects prefect-agent_1 | response = await self._send_single_request(request) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request prefect-agent_1 | response = await transport.handle_async_request(request) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request prefect-agent_1 | with map_httpcore_exceptions(): prefect-agent_1 | File "/usr/local/lib/python3.10/contextlib.py", line 153, in exit prefect-agent_1 | self.gen.throw(typ, value, traceback) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions prefect-agent_1 | raise mapped_exc(message) from exc prefect-agent_1 | httpx.ReadError prefect-agent_1 | 23:07:54.465 | DEBUG | prefect.agent - Checking for flow runs...
Then there's another error in agent logs. No flows running. other error is broken pipe
prefect-agent_1   | 23:11:21.748 | DEBUG   | prefect.agent - Checking for flow runs...
prefect-agent_1 | 23:11:21.755 | ERROR | prefect.agent - prefect-agent_1 | Traceback (most recent call last): prefect-agent_1 | File "/usr/local/lib/python3.10/asyncio/selector_events.py", line 916, in write prefect-agent_1 | n = self._sock.send(data) prefect-agent_1 | BrokenPipeError: [Errno 32] Broken pipe prefect-agent_1 | prefect-agent_1 | The above exception was the direct cause of the following exception: prefect-agent_1 | prefect-agent_1 | Traceback (most recent call last): prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions prefect-agent_1 | yield prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 33, in read prefect-agent_1 | return await self._stream.receive(max_bytes=max_bytes) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive prefect-agent_1 | raise self._protocol.exception prefect-agent_1 | anyio.BrokenResourceError prefect-agent_1 | prefect-agent_1 | During handling of the above exception, another exception occurred: prefect-agent_1 | prefect-agent_1 | Traceback (most recent call last): prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions prefect-agent_1 | yield prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request prefect-agent_1 | resp = await self._pool.handle_async_request(req) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request prefect-agent_1 | raise exc prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request prefect-agent_1 | response = await connection.handle_async_request(request) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request prefect-agent_1 | return await self._connection.handle_async_request(request) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request prefect-agent_1 | raise exc prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request prefect-agent_1 | ) = await self._receive_response_headers(**kwargs) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers prefect-agent_1 | event = await self._receive_event(timeout=timeout) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 177, in _receive_event prefect-agent_1 | data = await self._network_stream.read( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 30, in read prefect-agent_1 | with map_exceptions(exc_map): prefect-agent_1 | File "/usr/local/lib/python3.10/contextlib.py", line 153, in exit prefect-agent_1 | self.gen.throw(typ, value, traceback) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions prefect-agent_1 | raise to_exc(exc) prefect-agent_1 | httpcore.ReadError prefect-agent_1 | prefect-agent_1 | The above exception was the direct cause of the following exception: prefect-agent_1 | prefect-agent_1 | Traceback (most recent call last): prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/prefect/agent.py", line 154, in get_and_submit_flow_runs prefect-agent_1 | queue_runs = await self.client.get_runs_in_work_queue( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/prefect/client/orion.py", line 759, in get_runs_in_work_queue prefect-agent_1 | response = await self._client.post( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post prefect-agent_1 | return await self.request( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request prefect-agent_1 | return await self.send(request, auth=auth, follow_redirects=follow_redirects) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send prefect-agent_1 | await super().send(*args, **kwargs) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send prefect-agent_1 | response = await self._send_handling_auth( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth prefect-agent_1 | response = await self._send_handling_redirects( prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects prefect-agent_1 | response = await self._send_single_request(request) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request prefect-agent_1 | response = await transport.handle_async_request(request) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request prefect-agent_1 | with map_httpcore_exceptions(): prefect-agent_1 | File "/usr/local/lib/python3.10/contextlib.py", line 153, in exit prefect-agent_1 | self.gen.throw(typ, value, traceback) prefect-agent_1 | File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions prefect-agent_1 | raise mapped_exc(message) from exc prefect-agent_1 | httpx.ReadError
I was using prefect 2.0.4 and everything was working fine.
b

Bianca Hoch

11/11/2022, 4:59 PM
Hi Faheem, just wanted to loop back and say that we are still investigating this on our end. Has bumping down to an older version of Prefect resolved this issue for you?
f

Faheem Khan

11/15/2022, 8:35 AM
Thanks @Bianca Hoch yeah it works with older versions.