Hi I am trying to run a sync connection task in a...
# prefect-community
k
Hi I am trying to run a sync connection task in airbyte using prefect. but getting this error sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) table block_schema_reference already exists. when I sync from airbyte it works.
1
a
can you share more details? do you have some code you can share?
is it even for Prefect 1.0 or 2.0?
k
I am using prefect 2
here's the code
Copy code
import uuid
from time import sleep

from prefect import task
from prefect.logging.loggers import get_logger

from prefect_airbyte import exceptions as err
from prefect_airbyte.client import AirbyteClient
from prefect import flow
from prefect_airbyte.connections import trigger_sync
# Connection statuses
CONNECTION_STATUS_ACTIVE = "active"
CONNECTION_STATUS_INACTIVE = "inactive"
CONNECTION_STATUS_DEPRECATED = "deprecated"

# Job statuses
JOB_STATUS_SUCCEEDED = "succeeded"
JOB_STATUS_FAILED = "failed"
JOB_STATUS_PENDING = "pending"
@task
def trigger_sync():

    airbyte_server_host: str = "localhost",
    airbyte_server_port: int = "8000",
    airbyte_api_version: str = "v1",
    connection_id: str = None,
    poll_interval_s: int = 15,
    status_updates: bool = False



@flow

def example_trigger_sync_flow():
        # Run other tasks and subflows here
        trigger_sync(
            connection_id="6788...")

example_trigger_sync_flow()
a
I don't understand what is the actual problem you are facing here. Your trigger_sync task doesn't do anything atm. The way those tasks are supposed to work is by triggering them directly in your flow, not in a task Can you try in a fresh new environment and follow the example exactly as it's shown in the docs i.e. by running the sync task in the flow directly (not in another task)?
Copy code
from prefect import flow
from prefect_airbyte.connections import trigger_sync


@flow
def example_trigger_sync_flow():

      # Run other tasks and subflows here

      trigger_sync(
            connection_id="your-connection-id-to-sync",
            poll_interval_s=3,
            status_updates=True
      )

example_trigger_sync_flow()
k
I have tried it one more time in a new environment just like to mentioned but still getting same errors.sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) table block_schema_reference already exists [SQL: CREATE TABLE block_schema_reference ( id CHAR(36) DEFAULT ( ( lower(hex(randomblob(4))) || '-' || lower(hex(randomblob(2))) || '-4' || substr(lower(hex(randomblob(2))),2) || '-' || substr('89ab',abs(random()) % 4 + 1, 1) || substr(lower(hex(randomblob(2))),2) || '-' || lower(hex(randomblob(6))) ) ) NOT NULL, created DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%f000', 'now')) NOT NULL, updated DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%f000', 'now')) NOT NULL, name VARCHAR NOT NULL, parent_block_schema_id CHAR(36) NOT NULL, reference_block_schema_id CHAR(36) NOT NULL, CONSTRAINT pk_block_schema_reference PRIMARY KEY (id), CONSTRAINT fk_block_schema_reference__parent_block_schema_id__block_schema FOREIGN KEY(parent_block_schema_id) REFERENCES block_schema (id) ON DELETE cascade, CONSTRAINT fk_block_schema_reference__reference_block_schema_id__block_schema FOREIGN KEY(reference_block_schema_id) REFERENCES block_schema (id) ON DELETE cascade ) ] (Background on this error at: https://sqlalche.me/e/14/e3q8)
a
so far you still didn't even post your code if this is Airbyte error, I recommend posting your issue here instead https://discuss.airbyte.io/ but if you believe this is a bug on the Prefect side, could you open an issue in this repo and describe your problem in more detail incl. your setup, minimal reproducible example and which steps did you take to fix it so far? https://github.com/PrefectHQ/prefect-airbyte
k
sure thankyou
🙌 1