komal azram
06/25/2022, 8:30 AMimport prefect
from prefect import task, Flow
@task
def claims_func():
airbyte_server_host = "localhost",
airbyte_server_port = 8000,
airbyte_api_version = "v1",
connection_id = conn_id,
with Flow("fhir-flow") as flow:
claims_func()
flow.run()
As per my understanding when when I run this flow it should trigger the connection and automatically sync data in from gcp->snowflake. I don't get any error but no data is synced.Omar Sultan
06/25/2022, 11:40 AMkomal azram
06/25/2022, 12:31 PMKevin Kho
06/25/2022, 2:27 PMkomal azram
06/27/2022, 11:19 AMKevin Kho
06/27/2022, 3:13 PMkomal azram
06/28/2022, 6:26 AMKevin Kho
06/28/2022, 7:42 AMkomal azram
06/28/2022, 10:22 AMimport uuid
from time import sleep
from prefect import task
from prefect.logging.loggers import get_logger
from prefect_airbyte import exceptions as err
from prefect_airbyte.client import AirbyteClient
from prefect import flow
from prefect_airbyte.connections import trigger_sync
# Connection statuses
CONNECTION_STATUS_ACTIVE = "active"
CONNECTION_STATUS_INACTIVE = "inactive"
CONNECTION_STATUS_DEPRECATED = "deprecated"
# Job statuses
JOB_STATUS_SUCCEEDED = "succeeded"
JOB_STATUS_FAILED = "failed"
JOB_STATUS_PENDING = "pending"
@task
def trigger_sync():
airbyte_server_host= "localhost",
airbyte_server_port = "8000",
airbyte_api_version = "v1",
connection_id= "",
poll_interval_s = 15,
status_updates = False
@flow
def sync_flow():
# Run other tasks and subflows here
trigger_sync()
sync_flow()
but getting these errors @task(name='my_unique_name', ...)
warnings.warn(
Traceback (most recent call last):
''''''
sqlite3.OperationalError: table block_schema_reference already exists