Hello I have started using prefect for airbyte orc...
# prefect-community
k
Hello I have started using prefect for airbyte orchestration task. Below I have attached my code
Copy code
import prefect
from prefect import task, Flow

@task
def claims_func():
    airbyte_server_host = "localhost",
    airbyte_server_port = 8000,
    airbyte_api_version = "v1",
    connection_id = conn_id,



with Flow("fhir-flow") as flow:
    claims_func()


 
flow.run()
As per my understanding when when I run this flow it should trigger the connection and automatically sync data in from gcp->snowflake. I don't get any error but no data is synced.
o
I think the code is missing the trigger function, your code is just defining the connection properties but there is no actual call being made to airbyte
k
is there any documentation links?
k
Hi Komal, I’m confused how this would sync. You are just defining variables in the task. It doesn’t do anything. Maybe you want to use the AirbyteConnectionTask here ?
k
I am unable to import airbyte connection task using from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask it gives following error: ModuleNotFoundError: No module named 'prefect.tasks.airbyte'; 'prefect.tasks' is not a package
k
Did you install prefect 1 or 2? Your code looks like Prefect 1 but it looks like you may have installed 2? You need to install Airbyte separately
k
I have installed prefect airbyte but AirbyteConnectionTask doesn't work. can you tell me what's the reciprocal for this tutorial in prefect 2 https://docs.airbyte.com/operator-guides/using-prefect-task/#start-prefect
k
For Prefect 2, it would be the docs of the collection here
k
I am using prefect 2 documentation now and trying to run this code
Copy code
import uuid
from time import sleep

from prefect import task
from prefect.logging.loggers import get_logger

from prefect_airbyte import exceptions as err
from prefect_airbyte.client import AirbyteClient
from prefect import flow
from prefect_airbyte.connections import trigger_sync
# Connection statuses
CONNECTION_STATUS_ACTIVE = "active"
CONNECTION_STATUS_INACTIVE = "inactive"
CONNECTION_STATUS_DEPRECATED = "deprecated"

# Job statuses
JOB_STATUS_SUCCEEDED = "succeeded"
JOB_STATUS_FAILED = "failed"
JOB_STATUS_PENDING = "pending"

@task
def trigger_sync():
    airbyte_server_host= "localhost",
    airbyte_server_port = "8000",
    airbyte_api_version = "v1",
    connection_id= "",
    poll_interval_s = 15,
    status_updates = False

@flow
def sync_flow():
            # Run other tasks and subflows here
    trigger_sync()
sync_flow()
but getting these errors
@task(name='my_unique_name', ...)
warnings.warn( Traceback (most recent call last): '''''' sqlite3.OperationalError: table block_schema_reference already exists
I am trying to move data from gcs bucket -> snowflake
104 Views