Slackbot
10/03/2024, 2:55 PMMarvin
10/03/2024, 2:55 PM"@task(name="process_chunk")def process_chunk(chunk, table_name, athena_schema_dict, raw_bucket, db_name, today_date): logger = get_run_logger() chunk['id_jour'] = today_date logger.info(f"Processing chunk for {table_name}") for col in chunk.columns: logger.info(f"Processing column '{col}' of type {chunk[col].dtype}") if col == 'id_jour': logger.info(f"Skipping conversion for column '{col}'") continue athena_type = athena_schema_dict.get(col.lower()) logger.info(f"Athena type for column '{col}': {athena_type}") # Convert based on Athena type if necessary if athena_type == 'timestamp': try: chunk[col] = pd.to_datetime(chunk[col], errors='coerce') logger.info(f"Column {col} successfully converted to datetime.") except Exception as e: logger.error(f"Error converting column {col} to datetime: {e}") elif athena_type == 'bigint': try: chunk[col] = pd.to_numeric(chunk[col], errors='coerce', downcast='