Has anyone had luck using `prefect-sqlalchemy` to ...
# ask-community
e
Has anyone had luck using
prefect-sqlalchemy
to retrieve column heads of a table? Typically the
execute
command returns a properties meta that you can pull that from, but it kind of seems like the current implementation doesn't return anything when using
k
I know this exact pain
e
This works and returns data
Copy code
@task
def fetch_data(block_name: str, table_name: str) -> list:
    all_rows = []
    with SqlAlchemyConnector.load(block_name) as connector:
        while True:
            new_rows = connector.fetch_all(f"SELECT * FROM {table_name}")
            if len(new_rows) == 0:
                break
            all_rows.extend(new_rows)
    return all_rows
However If I try to use an execute command,
Copy code
@task
def fetch_data(block_name: str, table_name: str) -> tuple[list[str], list[tuple]]:
    with SqlAlchemyConnector.load(block_name) as connector:
        result = connector.execute(f"SELECT * FROM {table_name}")
        all_rows = result.fetchall()
        column_names = result.keys()
    return column_names, all_rows
this crashes with a None returned
ouch 😞
k
the types of operations that return metadata like keys aren't directly available through something like
fetchall
if I'm remembering correctly
it's still possible but you have to submit your queries a bit differently
e
Yeah I've been trying a bunch of things but no dice. About to craft an ugly sql query to that looks at
information_schema.columns
to generate a col list that way
k
prettttty sure I ended up doing this
👀 1
this as in the information schema queries
e
gotcha! Thanks for the confirm Kevin
k
actually, I might have a different way through this that doesn't involve multiple queries
the
execute
method on our connector block doesn't return anything
but you can call
connector.get_engine()
and build the connection yourself off the engine and then call
connection.execute()
like:
Copy code
engine = connector.get_engine()
connection = engine.connect() 
result = connection.execute("your query string")
maybe?
👀 1
at that point you're just using the block to store connection info and then native sqlalchemy for everything else. but hey I see nothing wrong with that
e
lemme try that
finally got it acting right! Thanks for the engine suggestion
Copy code
@task
def fetch_data(block_name: str, table_name: str):
    with SqlAlchemyConnector.load(block_name) as connector:
        all_rows = []
        engine = connector.get_engine()
        connection = engine.connect()
        query = f"SELECT * FROM {table_name}"
        result = connection.execute(text(query))
        for row in result:
            all_rows.append(row)
        headers = result.keys()
    return all_rows, headers

@task
def write_data(all_rows, column_headers, table_name):
    os.makedirs('exports', exist_ok=True)
    file_name = table_name.split(".")[1] if "." in table_name else table_name
    with open(f'exports/{file_name}.csv', 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(column_headers)
        writer.writerows(all_rows)
k
yessss
as a general bit of advice, all the prefect-* integration packages are wrappers for common library functionality and make some things more convenient but also make some things harder to do. however, you can pretty much always use your block credentials or configs to instantiate clients or engines or whatever it may be to work directly with the underlying library, making the blocks still super useful if the package doesn't do exactly what you need