Andreas Nigg
08/16/2022, 1:50 PMfrom prefect import flow
from prefect_airbyte.connections import trigger_sync
@flow(
name="trigger_airbyte",
description=
"Triggers the specifed airbyte connection ids. Uses the connection_ids to determine, which airbyte ids to trigger.",
timeout_seconds=7200)
def trigger_airbyte(airbyte_host, airbyte_port, connection_ids):
for id in connection_ids:
trigger_sync(airbyte_server_host=airbyte_host,
airbyte_server_port=airbyte_port,
airbyte_api_version="v1",
connection_id=id,
poll_interval_s=15,
status_updates=True)
# %%
if __name__ == "__main__":
trigger_airbyte(
airbyte_host="localhost",
airbyte_port=8000,
connection_ids=["d0e381cf-115d-4782-9581-1c37bd8d4d05"])
14:59:41.663 | ERROR | Task run 'trigger_sync-6dd98a16-1' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 33, in read
return await self._stream.receive(max_bytes=max_bytes)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
await self._protocol.read_event.wait()
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/asyncio/locks.py", line 226, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
yield
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 35, in read
return b""
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
raise TimeoutError
TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
raise exc
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request
) = await self._receive_response_headers(**kwargs)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers
event = await self._receive_event(timeout=timeout)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 177, in _receive_event
data = await self._network_stream.read(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 35, in read
return b""
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
raise to_exc(exc)
httpcore.ReadTimeout
Anna Geller
08/16/2022, 2:24 PMAndrew Huang
08/16/2022, 4:39 PMsubmit
the task instead?
from prefect import flow
from prefect_airbyte.connections import trigger_sync
@flow(
name="trigger_airbyte",
description=
"Triggers the specifed airbyte connection ids. Uses the connection_ids to determine, which airbyte ids to trigger.",
timeout_seconds=7200)
def trigger_airbyte(airbyte_host, airbyte_port, connection_ids):
for id in connection_ids:
trigger_sync.submit(airbyte_server_host=airbyte_host,
airbyte_server_port=airbyte_port,
airbyte_api_version="v1",
connection_id=id,
poll_interval_s=15,
status_updates=True)
# %%
if __name__ == "__main__":
trigger_airbyte(
airbyte_host="localhost",
airbyte_port=8000,
connection_ids=["d0e381cf-115d-4782-9581-1c37bd8d4d05"])
Nate
08/16/2022, 7:24 PMHowever, if I run the flow twice in parallelwouldn't this just start a race condition between the two flows to trigger the connection? Why do you want to trigger the same connection twice in parallel?
Andreas Nigg
08/17/2022, 7:16 AMfrom prefect import flow
from prefect_airbyte.connections import trigger_sync
@flow(
name="trigger_airbyte",
description=
"Triggers the specifed airbyte connection ids. Uses the connection_ids to determine, which airbyte ids to trigger.",
timeout_seconds=7200)
def trigger_airbyte(airbyte_host, airbyte_port, connection_ids):
for id in connection_ids:
trigger_sync.submit(airbyte_server_host=airbyte_host,
airbyte_server_port=airbyte_port,
airbyte_api_version="v1",
connection_id=id,
poll_interval_s=15,
status_updates=True)
# %%
if __name__ == "__main__":
trigger_airbyte(
airbyte_host="localhost",
airbyte_port=8000,
connection_ids=["d0e381cf-115d-4782-9581-1c37bd8d4d05", "8829bb5e-6721-466d-91fe-c67fd95ebd12", "4b6d9fb4-fb80-4f7f-82fe-e407f2e03640"])
Nate
08/17/2022, 2:16 PMBoggdan Barrientos
11/03/2022, 6:01 PMNate
11/03/2022, 6:32 PMprefect-airbyte==0.1.2
, there is an optional argument for trigger_sync
called timeout
which allows you to set the POST request timeout
for the httpx.AsyncClient
used under the hood. The default is 5 seconds for POSTs per httpx (all airbyte endpoints use POST method), but you can pass a larger value if you're noticing timeouts