Ed Hartley
09/04/2025, 1:17 PMwith SqlAlchemyConnector.load("fqrs-sqlserver") as connector:
connector.execute(file_as_string("src/tfs_flows/socials/SQL/create_table.sql"))
connector.execute("TRUNCATE table dw_temp.dbo.socials_testing")
# Insert dataframe here
Marvin
09/04/2025, 1:18 PMMarvin
09/04/2025, 1:19 PMto_sql
. Here are two common patterns:
Using the engine (simple and works well with pandas):
from prefect_sqlalchemy import SqlAlchemyConnector
# assume you already have a DataFrame named df
with SqlAlchemyConnector.load("fqrs-sqlserver") as connector:
connector.execute(file_as_string("src/tfs_flows/socials/SQL/create_table.sql"))
connector.execute("TRUNCATE TABLE dw_temp.dbo.socials_testing")
# For SQL Server, fast_executemany can speed up inserts
engine = connector.get_engine(fast_executemany=True)
df.to_sql(
name="socials_testing",
con=engine,
schema="dw_temp",
if_exists="append",
index=False,
chunksize=1000, # tune for your data
method="multi" # batches values into multi-row INSERTs
)
Using a transactional connection:
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("fqrs-sqlserver") as connector:
connector.execute(file_as_string("src/tfs_flows/socials/SQL/create_table.sql"))
connector.execute("TRUNCATE TABLE dw_temp.dbo.socials_testing")
with connector.get_connection() as conn:
df.to_sql(
name="socials_testing",
con=conn,
schema="dw_temp",
if_exists="append",
index=False,
chunksize=1000,
method="multi"
)
Notes:
- For SQL Server via pyodbc, fast_executemany=True
can significantly improve performance.
- If you need explicit column types, pass dtype={...}
with SQLAlchemy types.
- If the table already exists (as in your code), keep if_exists="append"
.
Docs for reference:
- Prefect SQLAlchemy integration: Prefect SQLAlchemy connector
- Pandas `to_sql`: pandas.DataFrame.to_sql
If you share your DataFrame shape/size and driver (e.g., mssql+pyodbc), I can suggest optimal chunksize
and dtype mappings.