<@ULVA73B9P> We are running Prefect OSS and Airbyt...
# ask-marvin
m
@Marvin We are running Prefect OSS and Airbyte OSS. We've found that fairly often, when using Prefect to trigger Airbyte sync jobs, they will give a false negative (report failure in an Airbyte sync when the sync hasn't failed), while giving just a generic "asyncio.exceptions.CancelledError" error. Are there any recommended steps for preventing or recovering from these errors? To be clear, the Airbyte sync continues to run and eventually succeeds, but the Prefect workflow gets marked as Failed.
m
Based on the information I found, it seems that the
asyncio.exceptions.CancelledError
is a common issue in Prefect when dealing with long-running tasks or services. Here are some potential solutions and workarounds: 1. Handling the CancelledError: You could potentially handle the
asyncio.exceptions.CancelledError
within your Prefect flow. This would involve catching the exception and deciding on the appropriate action, such as retrying the task or marking it as successful if the Airbyte sync job is still running. 2. Adjusting the timeout settings: If the error is due to the task taking too long, you could try adjusting the timeout settings for your tasks in Prefect. This would give your tasks more time to complete before they are cancelled. 3. Checking for updates and bug fixes: There are several related issues on the Prefect GitHub repository (issue 9148, issue 10367, issue 8976). It might be worth checking these out and seeing if any updates or bug fixes have been released that could solve your problem. 4. Contacting Prefect Support: If none of the above solutions work, it might be worth reaching out to Prefect support or the Prefect community for further assistance. They may be able to provide more specific advice based on your particular setup and use case. Remember to always test these changes in a controlled environment before deploying them to production.
n
hey @Mike B - do you have a trace from a failure
m
Hi @Nate, the errors are pretty generic. Here's what we see in the logs:
Copy code
Starting Airbyte sync for a3d1ab40-580b-473f-b1db-7c7e248f86ba
08:05:38 AM
Run Airbyte Sync-0
prefect.task_runs
Triggering Airbyte Connection a3d1ab40-580b-473f-b1db-7c7e248f86ba, in workspace at '<http://localhost:8000/api/v1>'
08:05:39 AM
Run Airbyte Sync-0
prefect.task_runs
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 34, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 32, in read
    with anyio.fail_after(timeout):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 191, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 31, in read
    with map_exceptions(exc_map):
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmpbugcdhaqprefect/orchestration/flows/brim_archive/workflow.py", line 54, in run_airbyte_sync
    job_run = connection.trigger()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
    return call()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 383, in __call__
    return self.result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 367, in trigger
    (job_id, _,) = await airbyte_client.trigger_manual_sync_connection(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 151, in trigger_manual_sync_connection
    response = await self._client.post(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout
08:05:44 AM
Run Airbyte Sync-0
prefect.task_runs
Finished in state Failed('Task run encountered an exception ReadTimeout: ')
n
it looks like the airbyte client from
prefect-airbyte
got a read timeout from the airbyte server, which I would suspect is not related to the client library, although arguably we should try to "reconnect" in such a situation. to be clear, i was the original author of this but i know airbyte server has likely changed a bit since the implementation 🧐 would you be willing to open an issue on
prefect-airbyte
with your trace / description of failure?
m
Sure, I'll look into getting an issue created tomorrow. Thanks!
💙 1
n
cool - thanks!
m
@Nate PR is here: https://github.com/PrefectHQ/prefect-airbyte/issues/64. Thanks for your help!