hello, I am seeing a strange behaviour when invoki...
# prefect-community
m
hello, I am seeing a strange behaviour when invoking an airbyte task from prefect, is this the right place to ask? using prefect 1.2.1
đź‘€ 1
k
Yes this is the right place to ask. What are you seeing?
m
Basically when the flow runs (it’s scheduled but it happens even if I do manually) some parameters in the Airbyte connection I am invoking get changed (namely the destination namespace and the normalization setting). I tried invoking using CURL and this does not happen.
k
Can I see how you use it?
m
for sure, what do you want to see?
k
The task usage in the Flow
m
give me a sec
Copy code
from prefect import Flow
from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask

airbyte_conn = AirbyteConnectionTask(
        airbyte_server_host="XX.XX.XX.XX",
        airbyte_server_port=8000,
        airbyte_api_version="v1",
        connection_id="<uuid_of_the_connection>"
)

with Flow("material-scraper-data") as flow:
    flow.add_task(airbyte_conn) 

# Register the flow under the "airbyte" project
flow.register(project_name="laserhub")
scheduling is done through UI
k
What ends up getting changed here and what does it get changed to?
m
destination namespace gets changed to “mirror source structure” (usually is “destination default”) and normalization gets changed from normalized to raw data
k
Ok looking into it
m
thanks a lot
k
Ok I am not sure on this but I think it might have to do with API limitations. I would need to ask our integrations lead but he’s currently out this week so feel free to ping me on Tuesday (Monday is a holiday too)
First time seeing this
m
for sure
thanks
hey ho, any news on this?
k
Hey, thanks for following up. Yesterday was a holiday and the people with Airbyte knowledge just got back from vacation today. Will raise this
m
I found something interesting meanwhile
k
Sure just add it to the thread
m
so unsurprisingly it’s the way prefect interact with airbyte API which causes the issues
first of all, prefect sets the schedule to whatever it is to manual but if the schedule is already set to manual the flow run throws an error. I suspect this is a bug.
this happens because the API response for the connection do not return any schedule if there isn’t any (!) but prefect expects it.
k
So that I opened an issue for here. Not exactly but it’s worth adding that comment
m
furthermore, if a schedule is found then the connections gets updated, setting the schedule to manual (correct) but while doing this the connection loses some properties (destination namespace and normalization) which is the behaviour I was seeing
I added a comment to the issue
k
thanks! I forwarded this too to Airbyte users on the team
m
thanks a lot
to be fair I don’t know if this is an issue with airbyte API (if I update a connection I would expect all the other properties to remain the same unless I explicitly change them) or with prefect. I’ll check also with the airbyte guys to be on the safe side
sorry to bother, if the fix is in master does it mean I have to wait for the next release? also, why is the conda package not updated yet?
k
yes to next release. or you might be able to install from master. is the conda version 1.1?
Ah I guess conda is 1.2.1 while the latest is 1.2.2. We probably didn’t just merge the conda recipe yet
b
Hi @Matteo Fiorillo did you solve this?
k
The schedule fix will be released tomorrow by latest
m
@Boggdan Barrientos as a temporary fix you can change the script yourself (like this)
I am still seeing errors:
Copy code
Task 'AirbyteConnectionTask': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 340, in run
    connection_status = self._get_connection_status(
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 176, in _get_connection_status
    self.logger.log(level=self.stream_output, msg=response.json())
AttributeError: 'AirbyteConnectionTask' object has no attribute 'stream_output'
k
How did you use it? That seems like it should be defined