Mike B
11/02/2023, 9:02 PMMarvin
11/02/2023, 9:05 PMasyncio.exceptions.CancelledError
is a common issue in Prefect when dealing with long-running tasks or services. Here are some potential solutions and workarounds:
1. Handling the CancelledError: You could potentially handle the asyncio.exceptions.CancelledError
within your Prefect flow. This would involve catching the exception and deciding on the appropriate action, such as retrying the task or marking it as successful if the Airbyte sync job is still running.
2. Adjusting the timeout settings: If the error is due to the task taking too long, you could try adjusting the timeout settings for your tasks in Prefect. This would give your tasks more time to complete before they are cancelled.
3. Checking for updates and bug fixes: There are several related issues on the Prefect GitHub repository (issue 9148, issue 10367, issue 8976). It might be worth checking these out and seeing if any updates or bug fixes have been released that could solve your problem.
4. Contacting Prefect Support: If none of the above solutions work, it might be worth reaching out to Prefect support or the Prefect community for further assistance. They may be able to provide more specific advice based on your particular setup and use case.
Remember to always test these changes in a controlled environment before deploying them to production.Nate
11/02/2023, 10:46 PMMike B
11/02/2023, 10:49 PMStarting Airbyte sync for a3d1ab40-580b-473f-b1db-7c7e248f86ba
08:05:38 AM
Run Airbyte Sync-0
prefect.task_runs
Triggering Airbyte Connection a3d1ab40-580b-473f-b1db-7c7e248f86ba, in workspace at '<http://localhost:8000/api/v1>'
08:05:39 AM
Run Airbyte Sync-0
prefect.task_runs
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 34, in read
return await self._stream.receive(max_bytes=max_bytes)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
await self._protocol.read_event.wait()
File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
yield
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 32, in read
with anyio.fail_after(timeout):
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
raise TimeoutError
TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request
raise exc
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request
) = await self._receive_response_headers(**kwargs)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
event = await self._receive_event(timeout=timeout)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 191, in _receive_event
data = await self._network_stream.read(
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 31, in read
with map_exceptions(exc_map):
File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc)
httpcore.ReadTimeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
result = await call.aresult()
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/tmp/tmpbugcdhaqprefect/orchestration/flows/brim_archive/workflow.py", line 54, in run_airbyte_sync
job_run = connection.trigger()
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
return call()
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 383, in __call__
return self.result()
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 367, in trigger
(job_id, _,) = await airbyte_client.trigger_manual_sync_connection(
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 151, in trigger_manual_sync_connection
response = await self._client.post(
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
return await self.request(
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
response = await self._send_handling_auth(
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
response = await self._send_handling_redirects(
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
response = await self._send_single_request(request)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
response = await transport.handle_async_request(request)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
with map_httpcore_exceptions():
File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ReadTimeout
08:05:44 AM
Run Airbyte Sync-0
prefect.task_runs
Finished in state Failed('Task run encountered an exception ReadTimeout: ')
Nate
11/02/2023, 10:55 PMprefect-airbyte
got a read timeout from the airbyte server, which I would suspect is not related to the client library, although arguably we should try to "reconnect" in such a situation. to be clear, i was the original author of this but i know airbyte server has likely changed a bit since the implementation 🧐
would you be willing to open an issue on prefect-airbyte
with your trace / description of failure?Mike B
11/02/2023, 10:56 PMNate
11/02/2023, 10:56 PMMike B
11/06/2023, 5:41 PM