https://prefect.io logo
#prefect-community
Title
# prefect-community
m

Matteo Fiorillo

06/15/2022, 2:33 PM
hello, I am seeing a strange behaviour when invoking an airbyte task from prefect, is this the right place to ask? using prefect 1.2.1
đź‘€ 1
k

Kevin Kho

06/15/2022, 2:34 PM
Yes this is the right place to ask. What are you seeing?
m

Matteo Fiorillo

06/15/2022, 2:38 PM
Basically when the flow runs (it’s scheduled but it happens even if I do manually) some parameters in the Airbyte connection I am invoking get changed (namely the destination namespace and the normalization setting). I tried invoking using CURL and this does not happen.
k

Kevin Kho

06/15/2022, 2:39 PM
Can I see how you use it?
m

Matteo Fiorillo

06/15/2022, 2:40 PM
for sure, what do you want to see?
k

Kevin Kho

06/15/2022, 2:41 PM
The task usage in the Flow
m

Matteo Fiorillo

06/15/2022, 2:42 PM
give me a sec
Copy code
from prefect import Flow
from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask

airbyte_conn = AirbyteConnectionTask(
        airbyte_server_host="XX.XX.XX.XX",
        airbyte_server_port=8000,
        airbyte_api_version="v1",
        connection_id="<uuid_of_the_connection>"
)

with Flow("material-scraper-data") as flow:
    flow.add_task(airbyte_conn) 

# Register the flow under the "airbyte" project
flow.register(project_name="laserhub")
scheduling is done through UI
k

Kevin Kho

06/15/2022, 2:44 PM
What ends up getting changed here and what does it get changed to?
m

Matteo Fiorillo

06/15/2022, 2:46 PM
destination namespace gets changed to “mirror source structure” (usually is “destination default”) and normalization gets changed from normalized to raw data
k

Kevin Kho

06/15/2022, 2:46 PM
Ok looking into it
m

Matteo Fiorillo

06/15/2022, 2:47 PM
thanks a lot
k

Kevin Kho

06/15/2022, 2:57 PM
Ok I am not sure on this but I think it might have to do with API limitations. I would need to ask our integrations lead but he’s currently out this week so feel free to ping me on Tuesday (Monday is a holiday too)
First time seeing this
m

Matteo Fiorillo

06/15/2022, 2:58 PM
for sure
thanks
hey ho, any news on this?
k

Kevin Kho

06/21/2022, 1:35 PM
Hey, thanks for following up. Yesterday was a holiday and the people with Airbyte knowledge just got back from vacation today. Will raise this
m

Matteo Fiorillo

06/21/2022, 1:36 PM
I found something interesting meanwhile
k

Kevin Kho

06/21/2022, 1:37 PM
Sure just add it to the thread
m

Matteo Fiorillo

06/21/2022, 1:37 PM
so unsurprisingly it’s the way prefect interact with airbyte API which causes the issues
first of all, prefect sets the schedule to whatever it is to manual but if the schedule is already set to manual the flow run throws an error. I suspect this is a bug.
this happens because the API response for the connection do not return any schedule if there isn’t any (!) but prefect expects it.
k

Kevin Kho

06/21/2022, 1:51 PM
So that I opened an issue for here. Not exactly but it’s worth adding that comment
m

Matteo Fiorillo

06/21/2022, 1:52 PM
furthermore, if a schedule is found then the connections gets updated, setting the schedule to manual (correct) but while doing this the connection loses some properties (destination namespace and normalization) which is the behaviour I was seeing
I added a comment to the issue
k

Kevin Kho

06/21/2022, 1:56 PM
thanks! I forwarded this too to Airbyte users on the team
m

Matteo Fiorillo

06/21/2022, 1:57 PM
thanks a lot
to be fair I don’t know if this is an issue with airbyte API (if I update a connection I would expect all the other properties to remain the same unless I explicitly change them) or with prefect. I’ll check also with the airbyte guys to be on the safe side
sorry to bother, if the fix is in master does it mean I have to wait for the next release? also, why is the conda package not updated yet?
k

Kevin Kho

06/22/2022, 1:52 PM
yes to next release. or you might be able to install from master. is the conda version 1.1?
Ah I guess conda is 1.2.1 while the latest is 1.2.2. We probably didn’t just merge the conda recipe yet
b

Boggdan Barrientos

07/05/2022, 9:06 PM
Hi @Matteo Fiorillo did you solve this?
k

Kevin Kho

07/05/2022, 9:30 PM
The schedule fix will be released tomorrow by latest
m

Matteo Fiorillo

07/06/2022, 7:46 AM
@Boggdan Barrientos as a temporary fix you can change the script yourself (like this)
I am still seeing errors:
Copy code
Task 'AirbyteConnectionTask': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 340, in run
    connection_status = self._get_connection_status(
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 176, in _get_connection_status
    self.logger.log(level=self.stream_output, msg=response.json())
AttributeError: 'AirbyteConnectionTask' object has no attribute 'stream_output'
k

Kevin Kho

07/07/2022, 4:34 PM
How did you use it? That seems like it should be defined