Slackbot
10/03/2024, 2:54 PMMarvin
10/03/2024, 2:54 PM"from datetime import datetimefrom prefect import task, get_run_logger import pandas as pd import jaydebeapi import awswrangler as wr # Function to get Python types based on SQL data types from a mapping file def get_python_type(bdd_type, config_bucket, type_column, corr_column): mapping_matrix = pd.read_csv(f"s3://{config_bucket}/mapping_types.csv", sep=";", encoding="utf-8", dtype=str) mapping_matrix = mapping_matrix[mapping_matrix["BDD_TYPE"] == bdd_type] mapping_dict = {} for _, row in mapping_matrix.iterrows(): mapping_dict[row[corr_column]] = row[type_column] return mapping_dict # Task to connect to SQL database @task(name="connect_to_database_sql") def connect_to_database(secrets): def connect(): conn = jaydebeapi.connect( 'com.microsoft.sqlserver.jdbc.SQLServerDriver', f"jdbcsqlserver//{secrets['host'].replace(',', ':')};database={secrets['dbname']};encrypt=false", {'user': secrets["username"], 'password': secrets["password"]}, "/mssql-jdbc.jar" ) return conn return connect @task(name="get_athena_schema") def get_athena_schema(database: str, table: str): logger = get_run_logger() try: schema = wr.catalog.table