Slackbot
10/03/2024, 2:56 PMMarvin
10/03/2024, 2:56 PM"@task(name="get_data_sql")def get_data(conn_str, table_name, db_name, conditions, raw_bucket, config_bucket): logger = get_run_logger() conn = conn_str() query = f"SELECT * FROM {table_name}" if conditions == "" else f"SELECT * FROM {table_name} WHERE {conditions}" python_dict = get_python_type("SQL", config_bucket, "PYTHON_TYPE", "TYPE") types = pd.read_csv(f"s3://{raw_bucket}/SQL/{db_name}/{table_name}/METADATA/meta.csv", sep=";", dtype=str) python_types = {row["COLUMN_NAME"]: python_dict.get(row["DATA_TYPE"], "str") for _, row in types.iterrows()} # Fetch the Athena schema to validate column types logger.info(f"Getting data from database {db_name} for table {table_name}") athena_schema = get_athena_schema.fn("raw", table_name) logger.info(f"Athena schema from task get_data: {athena_schema}") athena_schema_dict = athena_schema.set_index("Column Name")["Type"].to_dict() logger.info(f"Athena schema dictionary from task get_data: {athena_schema_dict}") df_chunks = pd.read_sql_query(query, conn, dtype=python_types,