Antti Tupamäki
10/18/2022, 10:53 AMRob Freedy
10/19/2022, 3:29 PMAntti Tupamäki
10/20/2022, 5:44 AMdef test_customer_etl_flow(ftp_containerport, dbcontainer_port):
target_dir = str(Path(__file__).parent)
db_user = CustomSecrets("DBT__PGUSER").get_secret()
db_password = CustomSecrets("DBT__PGPASSWORD").get_secret()
customer_etl.flow.run(
parameters=dict(
target_dir=target_dir,
hostname="localhost",
raw_schema="public",
download_folder="/upload",
sftp_port=ftp_containerport,
db_port=dbcontainer_port,
database = "local"
)
)
conn_string = get_sqlalchemy_db_uri_for_postgres(
"localhost",
"postgres",
db_user,
db_password,
dbcontainer_port,
)
<http://logging.info|logging.info>("Connecting to database")
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
<http://logging.info|logging.info>("Connected! Fetching data")
cursor.execute(
"""SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE' """
)
assert len(tables) == 6
with Flow(
name="customer ETL",
schedule=None,
executor=DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 1}),
) as flow: and here is too much customer specific stuff...
Rob Freedy
10/24/2022, 1:02 PMAntti Tupamäki
10/25/2022, 11:13 AMRob Freedy
10/25/2022, 1:29 PMAntti Tupamäki
10/26/2022, 5:37 AM