Eric Albanese
03/27/2024, 11:14 PMprefect-sqlalchemy
to retrieve column heads of a table? Typically the execute
command returns a properties meta that you can pull that from, but it kind of seems like the current implementation doesn't return anything when usingKevin Grismore
03/27/2024, 11:14 PMEric Albanese
03/27/2024, 11:15 PM@task
def fetch_data(block_name: str, table_name: str) -> list:
all_rows = []
with SqlAlchemyConnector.load(block_name) as connector:
while True:
new_rows = connector.fetch_all(f"SELECT * FROM {table_name}")
if len(new_rows) == 0:
break
all_rows.extend(new_rows)
return all_rows
However If I try to use an execute command,
@task
def fetch_data(block_name: str, table_name: str) -> tuple[list[str], list[tuple]]:
with SqlAlchemyConnector.load(block_name) as connector:
result = connector.execute(f"SELECT * FROM {table_name}")
all_rows = result.fetchall()
column_names = result.keys()
return column_names, all_rows
this crashes with a None returnedEric Albanese
03/27/2024, 11:15 PMKevin Grismore
03/27/2024, 11:15 PMfetchall
if I'm remembering correctlyKevin Grismore
03/27/2024, 11:16 PMEric Albanese
03/27/2024, 11:16 PMinformation_schema.columns
to generate a col list that wayKevin Grismore
03/27/2024, 11:16 PMKevin Grismore
03/27/2024, 11:17 PMEric Albanese
03/27/2024, 11:19 PMKevin Grismore
03/27/2024, 11:24 PMKevin Grismore
03/27/2024, 11:24 PMexecute
method on our connector block doesn't return anythingKevin Grismore
03/27/2024, 11:26 PMconnector.get_engine()
and build the connection yourself off the engine and then call connection.execute()
Kevin Grismore
03/27/2024, 11:27 PMengine = connector.get_engine()
connection = engine.connect()
result = connection.execute("your query string")
maybe?Kevin Grismore
03/27/2024, 11:29 PMEric Albanese
03/27/2024, 11:29 PMEric Albanese
03/27/2024, 11:58 PM@task
def fetch_data(block_name: str, table_name: str):
with SqlAlchemyConnector.load(block_name) as connector:
all_rows = []
engine = connector.get_engine()
connection = engine.connect()
query = f"SELECT * FROM {table_name}"
result = connection.execute(text(query))
for row in result:
all_rows.append(row)
headers = result.keys()
return all_rows, headers
@task
def write_data(all_rows, column_headers, table_name):
os.makedirs('exports', exist_ok=True)
file_name = table_name.split(".")[1] if "." in table_name else table_name
with open(f'exports/{file_name}.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(column_headers)
writer.writerows(all_rows)
Kevin Grismore
03/28/2024, 12:25 AMKevin Grismore
03/28/2024, 12:28 AM