Lucien Fregosi
01/27/2023, 2:36 PMprefect-airbyte
is not working anymore
I got his error
trigger_manual_sync_connection
raise err.AirbyteServerNotHealthyException() from e
prefect_airbyte.exceptions.AirbyteServerNotHealthyException
Even if the airbyte connection is properly triggered in airbyte.Nate
01/27/2023, 2:57 PMLucien Fregosi
01/27/2023, 2:57 PMreturn trigger_sync.with_options(retries=self.retries, retry_delay_seconds=self.retry_delay_seconds)(
connection_id=self.get_connection_id_from_state_file(),
poll_interval_s=self.poll_interval,
status_updates=self.status_updates,
airbyte_server_host=urlparse(self.airbyte_api_base_url).hostname,
airbyte_server_port=urlparse(self.airbyte_api_base_url).port,
timeout=self.timeout,
)
Nate
01/27/2023, 3:03 PMLucien Fregosi
01/27/2023, 3:04 PMEncountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect_airbyte/client.py", line 154, in trigger_manual_sync_connection
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 749, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '409 Conflict' for url '<http://airbyte-lfo-webapp.airbyte.svc.cluster.local/api/v1/connections/sync/>'
For more information check: <https://httpstatuses.com/409>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1481, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 149, in trigger_sync
) = await airbyte_client.trigger_manual_sync_connection(connection_id)
File "/usr/local/lib/python3.10/site-packages/prefect_airbyte/client.py", line 166, in trigger_manual_sync_connection
raise err.AirbyteServerNotHealthyException() from e
prefect_airbyte.exceptions.AirbyteServerNotHealthyException
Nate
01/27/2023, 3:32 PMprefect==2.7.10
and prefect-airbyte==0.2.0
409 would suggest to me that something has changed with your airbyte instance, or perhaps you attempted to run a connection that was already running?
does any attempt to trigger one of your connections with trigger_sync
fail with the same 409 error?Lucien Fregosi
01/27/2023, 3:41 PMprefect==2.7.8
and prefect-airbyte==0.2.0
Nate
01/27/2023, 3:49 PMtrigger_sync.with_options
above is called?Lucien Fregosi
01/27/2023, 3:50 PMpoll_interval: int = 5
timeout: int = 100
retries: int = 2
retry_delay_seconds: int = 5
status_updates: bool = True
And for connection / host / port it’s related to my airbyte server hosted on kubernetesNate
01/27/2023, 3:51 PMreturn trigger_sync.with_options(retries=self.retries, retry_delay_seconds=self.retry_delay_seconds)(
connection_id=self.get_connection_id_from_state_file(),
poll_interval_s=self.poll_interval,
status_updates=self.status_updates,
airbyte_server_host=urlparse(self.airbyte_api_base_url).hostname,
airbyte_server_port=urlparse(self.airbyte_api_base_url).port,
timeout=self.timeout,
)
is the return of some class's method that you've defined. Can you show how you're calling this method?Lucien Fregosi
01/27/2023, 3:53 PMairbyte = Airbyte(
airbyte_state_bucket_name=self.airbyte_state_bucket_name,
airbyte_api_base_url=self.airbyte_api_base_url,
connection_name=self.connection_name,
)
connection_id: Optional[str] = airbyte.get_connection_id_from_state_file()
Logger().get_logger().info(f"Connection id {connection_id} for connection name {self.connection_name}")
airbyte.trigger_airbyte_task()
And
def trigger_airbyte_task(self) -> Union[dict, None]:
try:
return trigger_sync.with_options(retries=self.retries, retry_delay_seconds=self.retry_delay_seconds)(
connection_id=self.get_connection_id_from_state_file(),
poll_interval_s=self.poll_interval,
status_updates=self.status_updates,
airbyte_server_host=urlparse(self.airbyte_api_base_url).hostname,
airbyte_server_port=urlparse(self.airbyte_api_base_url).port,
timeout=self.timeout,
)
except Exception as e:
Logger().get_logger().exception(f"Can't trigger Airbyte task. error={e}")
return None
Nate
01/27/2023, 3:57 PM@flow
-decorated function definition?2.7.8
-> 2.7.10
could have changed your airbyte instance's behavior when sending POST requests to your {base_url}/connections/sync
- it seems from this
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect_airbyte/client.py", line 154, in trigger_manual_sync_connection
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 749, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '409 Conflict' for url '<http://airbyte-lfo-webapp.airbyte.svc.cluster.local/api/v1/connections/sync/>' <--- HERE
that the correct endpoint is still being hit by trigger_sync
sorry that I don't have a more helpful answer, but it seems like the issue may be related to your airbyte instance.
what happens if you try just a bare API call to that endpoint? like
def trigger_connection_sync(connection_id: str) -> dict:
response = <http://requests.post|requests.post>(
f'{API_ROOT}/connections/sync',
json={'connectionId': connection_id},
)
response.raise_for_status()
return response.json()
I'd suspect you get the same 409 errorLucien Fregosi
01/28/2023, 10:00 AMNate
02/06/2023, 4:34 PMAirbyteServer
block.
Soon I can do some more testing with the trigger_sync
task in particular to make sure we're not missing any brand new changes on Airbyte's side.
ICYMI, we now support an AirbyteConnection
block that makes it easier to run airbyte connection syncs how you want to, for example
airbyte_server = AirbyteServer(server_host="localhost", server_port=8000, username="airbyte", password="password")
airbyte_connections = [
AirbyteConnection(
connection_id=connection_id,
airbyte_server=airbyte_server
)
for connection_id in [
"fa8d5164-ca22-47da-83e0-829cc86a70b8", # connection 1
"8e3a3ef1-6c2d-4255-99d4-21e9e542d853", # connection 2
"980c14d9-2992-46a7-a1dc-2b92e60e1475" # connection 3
]
]
# define your own task
@task
def run_airbyte_sync(connection: AirbyteConnection) -> AirbyteSyncResult:
job_run = connection.trigger()
job_run.wait_for_completion()
return job_run.fetch_result()
@flow
def some_elt_flow():
# run airbyte syncs
airbyte_results = run_airbyte_sync.map(airbyte_connections)
Lucien Fregosi
02/06/2023, 4:35 PMNate
02/06/2023, 4:39 PMLucien Fregosi
02/06/2023, 4:39 PMjava.lang.NullPointerException: Cannot invoke "io.airbyte.config.storage.CloudStorageConfigs.getType()" because the return value of "io.airbyte.config.helpers.LogConfigs.getStorageConfigs()" is null
Nate
02/06/2023, 5:21 PMLucien Fregosi
02/06/2023, 5:22 PMNate
02/06/2023, 5:22 PM