https://prefect.io logo
Title
k

komal azram

06/25/2022, 8:30 AM
Hello I have started using prefect for airbyte orchestration task. Below I have attached my code
import prefect
from prefect import task, Flow

@task
def claims_func():
    airbyte_server_host = "localhost",
    airbyte_server_port = 8000,
    airbyte_api_version = "v1",
    connection_id = conn_id,



with Flow("fhir-flow") as flow:
    claims_func()


 
flow.run()
As per my understanding when when I run this flow it should trigger the connection and automatically sync data in from gcp->snowflake. I don't get any error but no data is synced.
o

Omar Sultan

06/25/2022, 11:40 AM
I think the code is missing the trigger function, your code is just defining the connection properties but there is no actual call being made to airbyte
k

komal azram

06/25/2022, 12:31 PM
is there any documentation links?
k

Kevin Kho

06/25/2022, 2:27 PM
Hi Komal, I’m confused how this would sync. You are just defining variables in the task. It doesn’t do anything. Maybe you want to use the AirbyteConnectionTask here ?
k

komal azram

06/27/2022, 11:19 AM
I am unable to import airbyte connection task using from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask it gives following error: ModuleNotFoundError: No module named 'prefect.tasks.airbyte'; 'prefect.tasks' is not a package
k

Kevin Kho

06/27/2022, 3:13 PM
Did you install prefect 1 or 2? Your code looks like Prefect 1 but it looks like you may have installed 2? You need to install Airbyte separately
k

komal azram

06/28/2022, 6:26 AM
I have installed prefect airbyte but AirbyteConnectionTask doesn't work. can you tell me what's the reciprocal for this tutorial in prefect 2 https://docs.airbyte.com/operator-guides/using-prefect-task/#start-prefect
k

Kevin Kho

06/28/2022, 7:42 AM
For Prefect 2, it would be the docs of the collection here
k

komal azram

06/28/2022, 10:22 AM
I am using prefect 2 documentation now and trying to run this code
import uuid
from time import sleep

from prefect import task
from prefect.logging.loggers import get_logger

from prefect_airbyte import exceptions as err
from prefect_airbyte.client import AirbyteClient
from prefect import flow
from prefect_airbyte.connections import trigger_sync
# Connection statuses
CONNECTION_STATUS_ACTIVE = "active"
CONNECTION_STATUS_INACTIVE = "inactive"
CONNECTION_STATUS_DEPRECATED = "deprecated"

# Job statuses
JOB_STATUS_SUCCEEDED = "succeeded"
JOB_STATUS_FAILED = "failed"
JOB_STATUS_PENDING = "pending"

@task
def trigger_sync():
    airbyte_server_host= "localhost",
    airbyte_server_port = "8000",
    airbyte_api_version = "v1",
    connection_id= "",
    poll_interval_s = 15,
    status_updates = False

@flow
def sync_flow():
            # Run other tasks and subflows here
    trigger_sync()
sync_flow()
but getting these errors
@task(name='my_unique_name', ...)
warnings.warn( Traceback (most recent call last): '''''' sqlite3.OperationalError: table block_schema_reference already exists
I am trying to move data from gcs bucket -> snowflake