https://prefect.io logo
Title
l

Lucien Fregosi

01/27/2023, 2:36 PM
Hi it seems that with the latest version of prefect 2.7.10
prefect-airbyte
is not working anymore I got his error
trigger_manual_sync_connection
    raise err.AirbyteServerNotHealthyException() from e
prefect_airbyte.exceptions.AirbyteServerNotHealthyException
Even if the airbyte connection is properly triggered in airbyte.
n

Nate

01/27/2023, 2:57 PM
Hi @Lucien Fregosi - what version of prefect-airbyte are you using? would you be able to share your code you're using to kick off your sync?
l

Lucien Fregosi

01/27/2023, 2:57 PM
ah yes you’re right i should update it, will do it and test again
@Nate i have the last airbyte version (0.2.0)
return trigger_sync.with_options(retries=self.retries, retry_delay_seconds=self.retry_delay_seconds)(
                connection_id=self.get_connection_id_from_state_file(),
                poll_interval_s=self.poll_interval,
                status_updates=self.status_updates,
                airbyte_server_host=urlparse(self.airbyte_api_base_url).hostname,
                airbyte_server_port=urlparse(self.airbyte_api_base_url).port,
                timeout=self.timeout,
            )
n

Nate

01/27/2023, 3:03 PM
can you share the full traceback?
l

Lucien Fregosi

01/27/2023, 3:04 PM
Sure
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect_airbyte/client.py", line 154, in trigger_manual_sync_connection
    response.raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 749, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '409 Conflict' for url '<http://airbyte-lfo-webapp.airbyte.svc.cluster.local/api/v1/connections/sync/>'
For more information check: <https://httpstatuses.com/409>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1481, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 149, in trigger_sync
    ) = await airbyte_client.trigger_manual_sync_connection(connection_id)
  File "/usr/local/lib/python3.10/site-packages/prefect_airbyte/client.py", line 166, in trigger_manual_sync_connection
    raise err.AirbyteServerNotHealthyException() from e
prefect_airbyte.exceptions.AirbyteServerNotHealthyException
n

Nate

01/27/2023, 3:32 PM
thanks! hmm. I'm not encountering issues using
prefect==2.7.10
and
prefect-airbyte==0.2.0
409 would suggest to me that something has changed with your airbyte instance, or perhaps you attempted to run a connection that was already running? does any attempt to trigger one of your connections with
trigger_sync
fail with the same 409 error?
l

Lucien Fregosi

01/27/2023, 3:41 PM
no i’m sure that the connection was not running and yes it happens with any connections
and it worked perfectly with
prefect==2.7.8
and
prefect-airbyte==0.2.0
n

Nate

01/27/2023, 3:49 PM
hmm - ok. Can you share how your
trigger_sync.with_options
above is called?
l

Lucien Fregosi

01/27/2023, 3:50 PM
poll_interval: int = 5
    timeout: int = 100
    retries: int = 2
    retry_delay_seconds: int = 5
    status_updates: bool = True
And for connection / host / port it’s related to my airbyte server hosted on kubernetes
n

Nate

01/27/2023, 3:51 PM
sorry, I mean it seems like this
return trigger_sync.with_options(retries=self.retries, retry_delay_seconds=self.retry_delay_seconds)(
                connection_id=self.get_connection_id_from_state_file(),
                poll_interval_s=self.poll_interval,
                status_updates=self.status_updates,
                airbyte_server_host=urlparse(self.airbyte_api_base_url).hostname,
                airbyte_server_port=urlparse(self.airbyte_api_base_url).port,
                timeout=self.timeout,
            )
is the return of some class's method that you've defined. Can you show how you're calling this method?
l

Lucien Fregosi

01/27/2023, 3:53 PM
airbyte = Airbyte(
                airbyte_state_bucket_name=self.airbyte_state_bucket_name,
                airbyte_api_base_url=self.airbyte_api_base_url,
                connection_name=self.connection_name,
            )
            connection_id: Optional[str] = airbyte.get_connection_id_from_state_file()
            Logger().get_logger().info(f"Connection id {connection_id} for connection name {self.connection_name}")
            airbyte.trigger_airbyte_task()
And
def trigger_airbyte_task(self) -> Union[dict, None]:
        try:
            return trigger_sync.with_options(retries=self.retries, retry_delay_seconds=self.retry_delay_seconds)(
                connection_id=self.get_connection_id_from_state_file(),
                poll_interval_s=self.poll_interval,
                status_updates=self.status_updates,
                airbyte_server_host=urlparse(self.airbyte_api_base_url).hostname,
                airbyte_server_port=urlparse(self.airbyte_api_base_url).port,
                timeout=self.timeout,
            )
        except Exception as e:
            Logger().get_logger().exception(f"Can't trigger Airbyte task. error={e}")
        return None
n

Nate

01/27/2023, 3:57 PM
thanks - so is that first block you sent in the most recent message within a
@flow
-decorated function definition?
I'm not sure anything in prefect core between
2.7.8
->
2.7.10
could have changed your airbyte instance's behavior when sending POST requests to your
{base_url}/connections/sync
- it seems from this
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect_airbyte/client.py", line 154, in trigger_manual_sync_connection
    response.raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 749, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '409 Conflict' for url '<http://airbyte-lfo-webapp.airbyte.svc.cluster.local/api/v1/connections/sync/>' <--- HERE
that the correct endpoint is still being hit by
trigger_sync
sorry that I don't have a more helpful answer, but it seems like the issue may be related to your airbyte instance. what happens if you try just a bare API call to that endpoint? like
def trigger_connection_sync(connection_id: str) -> dict:
    response = <http://requests.post|requests.post>(
        f'{API_ROOT}/connections/sync',
        json={'connectionId': connection_id},
    )
    response.raise_for_status()
    return response.json()
I'd suspect you get the same 409 error
l

Lucien Fregosi

01/28/2023, 10:00 AM
hmm I upgraded also the airbyte version maybe it comes from airbyte server indeed I will try to revert airbyte upgrade
👍 1
n

Nate

02/06/2023, 4:34 PM
Hi @Lucien Fregosi - we do support user / password auth using the
AirbyteServer
block. Soon I can do some more testing with the
trigger_sync
task in particular to make sure we're not missing any brand new changes on Airbyte's side. ICYMI, we now support an
AirbyteConnection
block that makes it easier to run airbyte connection syncs how you want to, for example
airbyte_server = AirbyteServer(server_host="localhost", server_port=8000, username="airbyte", password="password")

airbyte_connections = [
    AirbyteConnection(
        connection_id=connection_id,
        airbyte_server=airbyte_server
    )
    for connection_id in [
        "fa8d5164-ca22-47da-83e0-829cc86a70b8", # connection 1
        "8e3a3ef1-6c2d-4255-99d4-21e9e542d853", # connection 2
        "980c14d9-2992-46a7-a1dc-2b92e60e1475" # connection 3
    ]
]

# define your own task
@task
def run_airbyte_sync(connection: AirbyteConnection) -> AirbyteSyncResult:
    job_run = connection.trigger()
    job_run.wait_for_completion()
    return job_run.fetch_result()

@flow
def some_elt_flow():
    # run airbyte syncs
    airbyte_results = run_airbyte_sync.map(airbyte_connections)
l

Lucien Fregosi

02/06/2023, 4:35 PM
Ok ok i will try with this new syntax then
n

Nate

02/06/2023, 4:39 PM
okay! let me know if you run into any trouble with the new blocks
l

Lucien Fregosi

02/06/2023, 4:39 PM
sure i let you know
:thank-you: 1
@Nate it didn’t change anything but I dig in the airbyte server logs and I got this error
java.lang.NullPointerException: Cannot invoke "io.airbyte.config.storage.CloudStorageConfigs.getType()" because the return value of "io.airbyte.config.helpers.LogConfigs.getStorageConfigs()" is null
n

Nate

02/06/2023, 5:21 PM
have you tried making a normal API call to try and trigger a sync? that error doesn't seem like its related to the implementation on the prefect-side
l

Lucien Fregosi

02/06/2023, 5:22 PM
No After some test I don’t think so. I’m looking at it with the airbyte team. Thanks a looooot for your help
n

Nate

02/06/2023, 5:22 PM
sure thing! let me know if I can be of additional help