Eric Albanese
03/27/2024, 11:14 PMprefect-sqlalchemyexecuteKevin Grismore
03/27/2024, 11:14 PMEric Albanese
03/27/2024, 11:15 PM@task
def fetch_data(block_name: str, table_name: str) -> list:
    all_rows = []
    with SqlAlchemyConnector.load(block_name) as connector:
        while True:
            new_rows = connector.fetch_all(f"SELECT * FROM {table_name}")
            if len(new_rows) == 0:
                break
            all_rows.extend(new_rows)
    return all_rows@task
def fetch_data(block_name: str, table_name: str) -> tuple[list[str], list[tuple]]:
    with SqlAlchemyConnector.load(block_name) as connector:
        result = connector.execute(f"SELECT * FROM {table_name}")
        all_rows = result.fetchall()
        column_names = result.keys()
    return column_names, all_rowsEric Albanese
03/27/2024, 11:15 PMKevin Grismore
03/27/2024, 11:15 PMfetchallKevin Grismore
03/27/2024, 11:16 PMEric Albanese
03/27/2024, 11:16 PMinformation_schema.columnsKevin Grismore
03/27/2024, 11:16 PMKevin Grismore
03/27/2024, 11:17 PMEric Albanese
03/27/2024, 11:19 PMKevin Grismore
03/27/2024, 11:24 PMKevin Grismore
03/27/2024, 11:24 PMexecuteKevin Grismore
03/27/2024, 11:26 PMconnector.get_engine()connection.execute()Kevin Grismore
03/27/2024, 11:27 PMengine = connector.get_engine()
connection = engine.connect() 
result = connection.execute("your query string")Kevin Grismore
03/27/2024, 11:29 PMEric Albanese
03/27/2024, 11:29 PMEric Albanese
03/27/2024, 11:58 PM@task
def fetch_data(block_name: str, table_name: str):
    with SqlAlchemyConnector.load(block_name) as connector:
        all_rows = []
        engine = connector.get_engine()
        connection = engine.connect()
        query = f"SELECT * FROM {table_name}"
        result = connection.execute(text(query))
        for row in result:
            all_rows.append(row)
        headers = result.keys()
    return all_rows, headers
@task
def write_data(all_rows, column_headers, table_name):
    os.makedirs('exports', exist_ok=True)
    file_name = table_name.split(".")[1] if "." in table_name else table_name
    with open(f'exports/{file_name}.csv', 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(column_headers)
        writer.writerows(all_rows)Kevin Grismore
03/28/2024, 12:25 AMKevin Grismore
03/28/2024, 12:28 AM