Ritesh
07/10/2025, 11:07 AMMarvin
07/10/2025, 11:07 AMMarvin
07/10/2025, 11:08 AMSqlAlchemyConnector
block
- Install with: pip install prefect-sqlalchemy
2. Snowflake (docs)
- Direct integration with Snowflake data warehouse
- Uses the SnowflakeConnector
block
- Install with: pip install prefect-snowflake
Here's a basic example of using SQLAlchemy with PostgreSQL:
python
from prefect_sqlalchemy import SqlAlchemyConnector
# Create and save a database connection block
postgres_block = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+psycopg2",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
)
)
postgres_block.save("postgres-demo")
# Use the connection in a flow
@flow
def query_database():
db = SqlAlchemyConnector.load("postgres-demo")
with db.get_connection() as conn:
result = conn.execute("SELECT * FROM your_table")
return result.fetchall()
For Snowflake:
python
from prefect_snowflake import SnowflakeConnector
# Create and save a Snowflake connection block
snowflake_block = SnowflakeConnector(
account="your_account",
user="your_user",
password="your_password",
warehouse="your_warehouse",
database="your_database",
schema="your_schema"
)
snowflake_block.save("snowflake-demo")
# Use the connection in a flow
@flow
def query_snowflake():
db = SnowflakeConnector.load("snowflake-demo")
with db.get_connection() as conn:
result = conn.execute("SELECT * FROM your_table")
return result.fetchall()
These connections can be configured either through code as shown above, or through the Prefect UI by creating blocks.
For any other specific databases, you can typically create a custom connection using SQLAlchemy as long as the appropriate database driver is installed. The SQLAlchemy integration is particularly versatile as it supports a wide range of SQL databases through different drivers.Ritesh
07/10/2025, 11:10 AMMarvin
07/10/2025, 11:10 AMMarvin
07/10/2025, 11:11 AMpostgresql+psycopg2
- Synchronous PostgreSQL driver
- postgresql+asyncpg
- Async PostgreSQL driver
- postgresql+pg8000
- Pure Python PostgreSQL driver
2. MySQL Drivers:
- mysql+pymysql
- Pure Python MySQL driver
- mysql+mysqlconnector
- MySQL Connector/Python
- mysql+aiomysql
- Async MySQL driver
3. SQLite Drivers:
- sqlite+pysqlite
- SQLite using built-in Python driver
- sqlite+aiosqlite
- Async SQLite driver
4. Oracle Drivers:
- oracle+cx_oracle
- Oracle with cx_Oracle driver
- oracle+oracledb
- Oracle with python-oracledb driver
5. Microsoft SQL Server Drivers:
- mssql+pyodbc
- SQL Server with ODBC
- mssql+pymssql
- SQL Server with pymssql
Here's an example of how to use a different driver with the SQLAlchemy connector:
python
from prefect_sqlalchemy import SqlAlchemyConnector
# For PostgreSQL with psycopg2
postgres_sync = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+psycopg2", # Using psycopg2 instead of asyncpg
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
)
)
# For MySQL with pymysql
mysql_connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="mysql+pymysql",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=3306,
)
)
# For SQLite
sqlite_connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="sqlite+pysqlite",
database="/path/to/your/database.db" # or ":memory:" for in-memory database
)
)
Remember to install the appropriate driver package for your database:
- PostgreSQL with psycopg2: pip install psycopg2-binary
- MySQL with pymysql: pip install pymysql
- Oracle: pip install cx_Oracle
- SQL Server with pyodbc: pip install pyodbc
The choice of driver depends on your specific needs:
- For synchronous operations, use drivers like psycopg2
, pymysql
, or cx_Oracle
- For async operations, use drivers like asyncpg
, aiomysql
, or aiosqlite
- For better Python compatibility and no external dependencies, consider pure Python drivers like pg8000
or pymysql
Ritesh
07/10/2025, 11:12 AMMarvin
07/10/2025, 11:12 AMMarvin
07/10/2025, 11:13 AMpython
from prefect_sqlalchemy import SqlAlchemyConnector
# Async Connection (using asyncpg)
async_postgres = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_db",
username="your_user",
password="your_password",
host="your_host",
port=5432,
)
)
# Sync Connection (using psycopg2)
sync_postgres = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+psycopg2",
database="your_db",
username="your_user",
password="your_password",
host="your_host",
port=5432,
)
)
2. Available Drivers
- `postgresql+asyncpg`: Async driver (recommended for best performance)
- `postgresql+psycopg2`: Standard synchronous driver
- `postgresql+pg8000`: Pure Python PostgreSQL driver
- `postgresql+psycopg2cffi`: Alternative to psycopg2 for PyPy
3. Connection Configuration Options
python
# Full configuration example
postgres_connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg", # or any other supported driver
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
# Additional SQLAlchemy options
query={"ssl": "true"}, # SSL connection
# Connection pool settings
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
# Execution options
execution_options={
"isolation_level": "REPEATABLE READ",
"stream_results": True
}
)
)
4. Best Practices
A. Connection Management
python
from prefect import flow, task
@task
async def database_operation():
async with postgres_connector.get_connection() as conn:
# Connection automatically closed after context
result = await conn.execute("SELECT * FROM my_table")
return await result.fetchall()
@flow
async def my_flow():
results = await database_operation()
return results
B. Connection Pooling
python
# Configure connection pooling for high-concurrency environments
postgres_connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_db",
username="your_user",
password="your_password",
host="your_host",
pool_size=10, # Default connection pool size
max_overflow=20, # Additional connections when pool is full
pool_timeout=30, # Seconds to wait for available connection
pool_recycle=1800, # Recycle connections after 30 minutes
)
)
5. Installation Requirements
bash
# Basic installation
pip install prefect-sqlalchemy
# Driver-specific installations
pip install asyncpg # For async PostgreSQL
pip install psycopg2-binary # For sync PostgreSQL
pip install pg8000 # For pure Python PostgreSQL
6. Error Handling and Retries
python
from prefect import flow, task
from sqlalchemy.exc import OperationalError
@task(retries=3, retry_delay_seconds=60)
async def resilient_db_operation():
try:
async with postgres_connector.get_connection() as conn:
return await conn.execute("SELECT * FROM my_table")
except OperationalError as e:
# Handle connection errors
raise RuntimeError(f"Database connection failed: {e}")
7. Security Best Practices
```python
# Use environment variables for sensitive information
import os
from prefect.blocks.system import Secret
postgres_connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(Marvin
07/10/2025, 11:13 AMRitesh
07/10/2025, 11:15 AMMarvin
07/10/2025, 11:16 AMMarvin
07/10/2025, 11:17 AMpython
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
# Note: Remove any 'async' keywords
@task
def db_task():
connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+psycopg2", # Synchronous driver
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432
)
)
# Use synchronous context manager
with connector.get_connection() as conn:
result = conn.execute("SELECT * FROM your_table")
return result.fetchall()
@flow
def my_flow():
results = db_task()
return results
# Run the flow
if __name__ == "__main__":
my_flow()
2. Option 2: Use Asynchronous Flow with asyncpg (Recommended)
python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
async def db_task():
connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg", # Async driver
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432
)
)
# Use async context manager
async with connector.get_connection() as conn:
result = await conn.execute("SELECT * FROM your_table")
return await result.fetchall()
@flow
async def my_flow():
results = await db_task()
return results
# Run the async flow
if __name__ == "__main__":
from prefect import get_client
import asyncio
asyncio.run(my_flow())
Key Points to Remember:
1. If using async def
in your tasks/flows:
- Use postgresql+asyncpg
as the driver
- Install asyncpg: pip install asyncpg
- Use async with
and await
keywords
2. If using regular synchronous functions:
- Use postgresql+psycopg2
as the driver
- Install psycopg2: pip install psycopg2-binary
- Use regular with
statements
- Remove all async/await
keywords
3. Required installations:
bash
# For async approach (Option 2)
pip install prefect-sqlalchemy asyncpg
# For sync approach (Option 1)
pip install prefect-sqlalchemy psycopg2-binary
The error you're seeing specifically indicates that you're mixing async and sync code. Make sure:
1. Your flow and tasks are either all synchronous or all asynchronous
2. The database driver matches your choice (asyncpg for async, psycopg2 for sync)
3. The context managers and execution patterns match your choice (async/await vs regular)
The async approach (Option 2) is generally recommended for better performance in Prefect 3.x, but both approaches will work correctly if implemented consistently.Ritesh
07/10/2025, 11:20 AMMarvin
07/10/2025, 11:20 AMMarvin
07/10/2025, 11:21 AMpython
from prefect_sqlalchemy import SqlAlchemyConnector
connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
query={"search_path": "prefect"} # This is the correct way
)
)
2. Using Server Settings
python
from prefect_sqlalchemy import SqlAlchemyConnector
connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
connect_args={
"server_settings": {"search_path": "prefect"}
}
)
)
Here's a complete working example:
python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
async def db_task():
connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
query={"search_path": "prefect"} # Correct way to set search_path
)
)
async with connector.get_connection() as conn:
result = await conn.execute("SELECT current_schema()")
return await result.fetchone()
@flow
async def my_flow():
result = await db_task()
return result
if __name__ == "__main__":
import asyncio
asyncio.run(my_flow())
If you need to set multiple schema paths, you can do:
python
# Multiple schemas in search path
connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
query={"search_path": "prefect,public"} # Comma-separated for multiple schemas
)
)
The error you're seeing occurs because the ?options=-csearch_path=prefect
syntax is not compatible with asyncpg's connection handling. Instead, use either the query
parameter or connect_args
with server_settings
as shown above.
Remember to:
1. Install required packages:
bash
pip install prefect-sqlalchemy asyncpg
2. Verify your connection by checking the search path:
python
@task
async def verify_search_path():
async with connector.get_connection() as conn:
result = await conn.execute("SHOW search_path")
return await result.fetchone()
These approaches properly configure the search path while maintaining compatibility with asyncpg and SQLAlchemy in Prefect 3.x.Ritesh
07/10/2025, 11:23 AMMarvin
07/10/2025, 11:23 AMMarvin
07/10/2025, 11:24 AMpython
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
# Save the connection block once
async def save_connector():
connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
query={"search_path": "prefect"},
# Connection pooling settings
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=300 # Recycle connections every 5 minutes
)
)
await connector.save("my-postgres-connection")
# Reuse in multiple flows
@task(retries=3, retry_delay_seconds=5)
async def db_operation(query: str):
connector = await SqlAlchemyConnector.load("my-postgres-connection")
async with connector.get_connection() as conn:
result = await conn.execute(query)
return await result.fetchall()
@flow
async def flow1():
return await db_operation("SELECT * FROM table1")
@flow
async def flow2():
return await db_operation("SELECT * FROM table2")
2. Using a Shared Module (Alternative Pattern)
python
# db_config.py
from prefect_sqlalchemy import SqlAlchemyConnector
def get_connector():
return SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
query={"search_path": "prefect"},
pool_size=5,
max_overflow=10,
pool_recycle=300
)
)
# flows.py
from prefect import flow, task
from db_config import get_connector
@task(retries=3)
async def db_task(query: str):
connector = get_connector()
async with connector.get_connection() as conn:
result = await conn.execute(query)
return await result.fetchall()
@flow
async def flow1():
return await db_task("SELECT * FROM table1")
@flow
async def flow2():
return await db_task("SELECT * FROM table2")
3. Using Connection Pooling with Multiple Flows
```python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
from typing import Optional
from contextlib import asynccontextmanager
class DatabaseConnection:
_connector: Optional[SqlAlchemyConnector] = None
@classmethod
def get_connector(cls):
if cls._connector is None:
cls._connector = SqlAlchemyConnector(
connection_info=SqlAlchemyConnector.ConnectionInfo(
driver="postgresql+asyncpg",
database="your_database",
username="your_username",
password="your_password",
host="your_host",
port=5432,
query={"search_path": "prefect"},
# Important pool settings for multiple flows
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=300,
pool_pre_ping=True # Verify connection before using
)
)
return cls._connector
@asynccontextmanager
async def get_db_connection():
connector = DatabaseConnection.get_connector()
async with connector.get_connection() as conn:
yield conn
@task(retries=3, retry_delay_seconds=5)
async def db_operation(query: str):
async with get_db_connection() as conn:
result = await conn.execute(query)Marvin
07/10/2025, 11:24 AMKey Recommendations:
1. *Use Connection Pooling*:
- Set appropriate `pool_size` and `max_overflow`
- Use `pool_recycle` to prevent stale connections
- Enable `pool_pre_ping` for connection verification
2. *Implement Retries*:
- Use Prefect's retry mechanism for database tasks
- Set appropriate retry delays and counts
3. *Resource Management*:
- Always use context managers (`async with`)
- Properly close connections after use
- Use connection pooling to manage resources efficiently
4. *Error Handling*:
```python
@task(retries=3, retry_delay_seconds=5)
async def safe_db_operation(query: str):
try:
async with get_db_connection() as conn:
result = await conn.execute(query)
return await result.fetchall()
except Exception as e:
# Log the error
logger.error(f"Database operation failed: {e}")
raise
5. Configuration Management:
python
from prefect.blocks.system import Secret
@task
async def get_db_config():
return {
"host": await Secret.load("DB_HOST").get(),
"password": await Secret.load("DB_PASSWORD").get(),
# ... other config
}
These patterns ensure:
- Efficient connection reuse across flows
- Proper resource management
- Error resilience
- Connection pool optimization
- Secure credential handling
Choose the pattern that best fits your specific use case, but the Blocks pattern (Option 1) is generally recommended as it provides the best balance of flexibility, security, and resource management.