Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    hello, I am seeing a strange behaviour when invoking an airbyte task from prefect, is this the right place to ask? using prefect 1.2.1
    Kevin Kho

    Kevin Kho

    3 months ago
    Yes this is the right place to ask. What are you seeing?
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    Basically when the flow runs (it’s scheduled but it happens even if I do manually) some parameters in the Airbyte connection I am invoking get changed (namely the destination namespace and the normalization setting). I tried invoking using CURL and this does not happen.
    Kevin Kho

    Kevin Kho

    3 months ago
    Can I see how you use it?
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    for sure, what do you want to see?
    Kevin Kho

    Kevin Kho

    3 months ago
    The task usage in the Flow
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    give me a sec
    from prefect import Flow
    from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask
    
    airbyte_conn = AirbyteConnectionTask(
            airbyte_server_host="XX.XX.XX.XX",
            airbyte_server_port=8000,
            airbyte_api_version="v1",
            connection_id="<uuid_of_the_connection>"
    )
    
    with Flow("material-scraper-data") as flow:
        flow.add_task(airbyte_conn) 
    
    # Register the flow under the "airbyte" project
    flow.register(project_name="laserhub")
    scheduling is done through UI
    Kevin Kho

    Kevin Kho

    3 months ago
    What ends up getting changed here and what does it get changed to?
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    destination namespace gets changed to “mirror source structure” (usually is “destination default”) and normalization gets changed from normalized to raw data
    Kevin Kho

    Kevin Kho

    3 months ago
    Ok looking into it
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    thanks a lot
    Kevin Kho

    Kevin Kho

    3 months ago
    Ok I am not sure on this but I think it might have to do with API limitations. I would need to ask our integrations lead but he’s currently out this week so feel free to ping me on Tuesday (Monday is a holiday too)
    First time seeing this
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    for sure
    thanks
    hey ho, any news on this?
    Kevin Kho

    Kevin Kho

    3 months ago
    Hey, thanks for following up. Yesterday was a holiday and the people with Airbyte knowledge just got back from vacation today. Will raise this
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    I found something interesting meanwhile
    Kevin Kho

    Kevin Kho

    3 months ago
    Sure just add it to the thread
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    so unsurprisingly it’s the way prefect interact with airbyte API which causes the issues
    first of all, prefect sets the schedule to whatever it is to manual but if the schedule is already set to manual the flow run throws an error. I suspect this is a bug.
    this happens because the API response for the connection do not return any schedule if there isn’t any (!) but prefect expects it.
    Kevin Kho

    Kevin Kho

    3 months ago
    So that I opened an issue for here. Not exactly but it’s worth adding that comment
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    furthermore, if a schedule is found then the connections gets updated, setting the schedule to manual (correct) but while doing this the connection loses some properties (destination namespace and normalization) which is the behaviour I was seeing
    I added a comment to the issue
    Kevin Kho

    Kevin Kho

    3 months ago
    thanks! I forwarded this too to Airbyte users on the team
    Matteo Fiorillo

    Matteo Fiorillo

    3 months ago
    thanks a lot
    to be fair I don’t know if this is an issue with airbyte API (if I update a connection I would expect all the other properties to remain the same unless I explicitly change them) or with prefect. I’ll check also with the airbyte guys to be on the safe side
    sorry to bother, if the fix is in master does it mean I have to wait for the next release? also, why is the conda package not updated yet?
    Kevin Kho

    Kevin Kho

    3 months ago
    yes to next release. or you might be able to install from master. is the conda version 1.1?
    Ah I guess conda is 1.2.1 while the latest is 1.2.2. We probably didn’t just merge the conda recipe yet
    Boggdan Barrientos

    Boggdan Barrientos

    2 months ago
    Hi @Matteo Fiorillo did you solve this?
    Kevin Kho

    Kevin Kho

    2 months ago
    The schedule fix will be released tomorrow by latest
    Matteo Fiorillo

    Matteo Fiorillo

    2 months ago
    @Boggdan Barrientos as a temporary fix you can change the script yourself (like this)
    I am still seeing errors:
    Task 'AirbyteConnectionTask': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 340, in run
        connection_status = self._get_connection_status(
      File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 176, in _get_connection_status
        self.logger.log(level=self.stream_output, msg=response.json())
    AttributeError: 'AirbyteConnectionTask' object has no attribute 'stream_output'
    Kevin Kho

    Kevin Kho

    2 months ago
    How did you use it? That seems like it should be defined