Hey. I've an issue with using the prefect-airbyte ...
# prefect-community
a
Hey. I've an issue with using the prefect-airbyte collection (v. 0.1.0) with prefect 2.0.4 (not related to version 0.4, it also occurs with any prefect 2 version). If this is the wrong channel to ask about collections, let me know. I've a very simple flow, which triggers one or many airbyte connections. The flow is almost 1 to 1 from the doc-samples. If I run the flow with triggering one airbyte connection id and wait, until the flow is done - and then wait for 10 (something) more seconds - before I start another flow run, everything is fine. However, if I run the flow twice in parallel (no matter whether it's with the same airbyte connection - id or a different one), I get a TaskCanceled Exception (see thread) - but the airbyte connection still gets triggered and finishes successfully. So the flow triggering airbyte works, but the flow gets canceled seconds after the airbyte connection is triggered. As the airbyte connections get both triggered (and both finish successfully), I'm starting to ask the question here - but I can imagine, that this might also be a problem on the airbyte side... Any hints, how to circumvent this? Edit: The same result whether I run the flow locally or use a dedicated agent to run it.
Flow:
Copy code
from prefect import flow
from prefect_airbyte.connections import trigger_sync

@flow(
    name="trigger_airbyte",
    description=
    "Triggers the specifed airbyte connection ids. Uses the connection_ids to determine, which airbyte ids to trigger.",
    timeout_seconds=7200)
def trigger_airbyte(airbyte_host, airbyte_port, connection_ids):
    for id in connection_ids:
        trigger_sync(airbyte_server_host=airbyte_host,
                    airbyte_server_port=airbyte_port,
                    airbyte_api_version="v1",
                    connection_id=id,
                    poll_interval_s=15,
                    status_updates=True)

# %%
if __name__ == "__main__":
    trigger_airbyte(
        airbyte_host="localhost",
        airbyte_port=8000,
        connection_ids=["d0e381cf-115d-4782-9581-1c37bd8d4d05"])
👀 1
Exception:
Copy code
14:59:41.663 | ERROR   | Task run 'trigger_sync-6dd98a16-1' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 33, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
    yield
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 35, in read
    return b""
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
    raise exc
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 177, in _receive_event
    data = await self._network_stream.read(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 35, in read
    return b""
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout
a
cc @Andrew Huang
also cc @Nate as Airbyte pro
a
Curious what happens if you
submit
the task instead?
Copy code
from prefect import flow
from prefect_airbyte.connections import trigger_sync

@flow(
    name="trigger_airbyte",
    description=
    "Triggers the specifed airbyte connection ids. Uses the connection_ids to determine, which airbyte ids to trigger.",
    timeout_seconds=7200)
def trigger_airbyte(airbyte_host, airbyte_port, connection_ids):
    for id in connection_ids:
        trigger_sync.submit(airbyte_server_host=airbyte_host,
                    airbyte_server_port=airbyte_port,
                    airbyte_api_version="v1",
                    connection_id=id,
                    poll_interval_s=15,
                    status_updates=True)

# %%
if __name__ == "__main__":
    trigger_airbyte(
        airbyte_host="localhost",
        airbyte_port=8000,
        connection_ids=["d0e381cf-115d-4782-9581-1c37bd8d4d05"])
🙏 1
n
@Andreas Nigg hmm, seems like there's an uncaught exception here for sure. however, do you mind explaining the outcome you're going for when you say:
However, if I run the flow twice in parallel
wouldn't this just start a race condition between the two flows to trigger the connection? Why do you want to trigger the same connection twice in parallel?
🙏 1
a
@Andrew Huang It's the same outcome, when I run it with submit. @Nate I stated it wrong. I always run two different connections in parallel. So my intention is to run this flow with two or more different deployments - with the deployment specifying the connection id. And there I'd like to run several deployments at approximately the same time. General: I found, that the exact Exception also happens, if I only trigger one connection. But there it's less frequent. It works 9 out of 10 times. (not statistically proven 😉 ) However, if I trigger three (different) connections in parallel, at least one of the tasks fail, almost all time. It's also not always the same connection, sometimes one fails, sometimes two.
If I change my example flow to this, I always get the exception for at least one of the tasks created.
Copy code
from prefect import flow
from prefect_airbyte.connections import trigger_sync

@flow(
    name="trigger_airbyte",
    description=
    "Triggers the specifed airbyte connection ids. Uses the connection_ids to determine, which airbyte ids to trigger.",
    timeout_seconds=7200)
def trigger_airbyte(airbyte_host, airbyte_port, connection_ids):
    for id in connection_ids:
        trigger_sync.submit(airbyte_server_host=airbyte_host,
                    airbyte_server_port=airbyte_port,
                    airbyte_api_version="v1",
                    connection_id=id,
                    poll_interval_s=15,
                    status_updates=True)

# %%
if __name__ == "__main__":
    trigger_airbyte(
        airbyte_host="localhost",
        airbyte_port=8000,
        connection_ids=["d0e381cf-115d-4782-9581-1c37bd8d4d05", "8829bb5e-6721-466d-91fe-c67fd95ebd12", "4b6d9fb4-fb80-4f7f-82fe-e407f2e03640"])
n
@Andreas Nigg thank you for bringing this up and digging into this, I'll try and reproduce this error and keep you up to date in this thread
🙏 1
b
@Nate I'm facing the same ReadTimeout error. Do you any updates?
n
Hi @Boggdan Barrientos I did forget to update this thread, but yes! As of
prefect-airbyte==0.1.2
, there is an optional argument for
trigger_sync
called
timeout
which allows you to set the POST request
timeout
for the
httpx.AsyncClient
used under the hood. The default is 5 seconds for POSTs per httpx (all airbyte endpoints use POST method), but you can pass a larger value if you're noticing timeouts
🙌 1