Henry Bruce
07/24/2025, 11:59 PMMarvin
07/24/2025, 11:59 PMMarvin
07/25/2025, 12:01 AM@memoize
Decorator Pattern
The most recommended approach is to use Prefect's @memoize
decorator to create a per-process cached database connection. Here's an example:
python
from prefect import flow, task
from prefect.utilities.local_cache import memoize
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@memoize
def get_db_engine():
# Create your thread-safe engine here
engine = create_engine(
"your_connection_string",
pool_pre_ping=True, # Ensures connection is valid before use
pool_size=5, # Adjust based on your needs
max_overflow=10
)
return engine
@memoize
def get_db_session():
engine = get_db_engine()
Session = sessionmaker(bind=engine)
return Session
@task
def task_1():
session = get_db_session()() # Note the double parentheses
try:
# Use session
result = session.execute("SELECT * FROM my_table")
return result
finally:
session.close()
@task
def task_2():
session = get_db_session()()
try:
# Use same session factory, but different session
result = session.execute("SELECT * FROM another_table")
return result
finally:
session.close()
@flow
def my_flow():
result1 = task_1()
result2 = task_2()
return result1, result2
Key benefits of this approach:
1. Process-Safe: The connection is cached per-process during a flow run
2. Memory Efficient: Multiple tasks in the same process will share the same connection pool
3. Clean Resource Management: The cache is automatically cleared at the end of the flow run
4. Thread-Safe: When using proper connection pooling (like SQLAlchemy's), the connections are thread-safe
2. Alternative: Context Manager Pattern
For more explicit resource management, you can use a context manager:
python
from contextlib import contextmanager
from prefect import flow, task
@memoize
def get_db_engine():
return create_engine("your_connection_string")
@contextmanager
def get_db_session():
engine = get_db_engine()
Session = sessionmaker(bind=engine)
session = Session()
try:
yield session
finally:
session.close()
@task
def task_with_db():
with get_db_session() as session:
return session.execute("SELECT * FROM my_table")
3. Best Practices:
- Always use connection pooling
- Implement proper error handling and connection cleanup
- Use pool_pre_ping=True
with SQLAlchemy to verify connections before use
- Close sessions explicitly in tasks
- Consider using async drivers if you're working with async flows
- Don't store database connections directly in flow or task state
4. Important Notes:
- The @memoize
decorator ensures your connection factory is created once per process
- Each task should get its own session from the session factory
- Always close sessions after use to return them to the pool
- Be cautious with long-running connections in distributed environments
Would you like me to elaborate on any of these patterns or provide more specific examples for your use case?Henry Bruce
07/25/2025, 12:05 AMMarvin
07/25/2025, 12:05 AMMarvin
07/25/2025, 12:06 AMpython
from prefect import flow, task
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
from typing import Generator
class DatabaseConnection:
def __init__(self, connection_string: str):
self.engine = create_engine(
connection_string,
pool_pre_ping=True, # Ensures connection is valid before use
pool_size=5, # Adjust based on your needs
max_overflow=10,
pool_recycle=3600 # Recycle connections after an hour
)
self.SessionFactory = sessionmaker(bind=self.engine)
@contextmanager
def get_session(self) -> Generator:
session = self.SessionFactory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Create a single instance at the module level
db = DatabaseConnection("your_connection_string")
@task
def task_1():
with db.get_session() as session:
result = session.execute("SELECT * FROM my_table")
return result.fetchall()
@task
def task_2():
with db.get_session() as session:
result = session.execute("SELECT * FROM another_table")
return result.fetchall()
@flow
def my_flow():
result1 = task_1()
result2 = task_2()
return result1, result2
Key points about this pattern in Prefect 3.x:
1. Single Instance: Create one database connection manager at the module level. The connection pool will be shared across tasks in the same process.
2. Connection Pooling: Use SQLAlchemy's built-in connection pooling to handle thread-safety and connection management.
3. Context Manager: Use a context manager pattern to ensure proper cleanup of sessions.
4. Error Handling: Include proper error handling and session cleanup in your connection management.
5. Configuration Options:
- `pool_pre_ping=True`: Validates connections before use
- `pool_size`: Set based on your concurrency needs
- `max_overflow`: Additional connections allowed when pool is full
- `pool_recycle`: Recycle connections after a specified time
For async workflows, you can adapt this pattern using async SQLAlchemy:
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
class AsyncDatabaseConnection:
def __init__(self, connection_string: str):
self.engine = create_async_engine(
connection_string,
pool_pre_ping=True,
pool_size=5,
max_overflow=10
)
self.SessionFactory = sessionmaker(
bind=self.engine,
class_=AsyncSession
)
@contextmanager
async def get_session(self):
session = self.SessionFactory()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@task
async def async_task():
async with db.get_session() as session:
result = await session.execute("SELECT * FROM my_table")
return await result.fetchall()
Would you like me to elaborate on any aspect of these patterns or provide examples for specific database systems?Nate
07/25/2025, 1:38 AMMarvin
07/25/2025, 1:38 AMMarvin
07/25/2025, 1:39 AM