Juan Victoriano Orza
07/23/2025, 7:10 PMMarvin
07/23/2025, 7:10 PMMarvin
07/23/2025, 7:10 PMpython
from prefect import flow, task
from prefect.blocks.core import Block
import asyncpg
from typing import Optional
from contextlib import asynccontextmanager
class PostgresConnectionPool(Block):
"""Block for managing an asyncpg connection pool."""
_block_type_name = "postgres-connection-pool"
_block_schema_capabilities = ["connection-pool"]
host: str
port: int = 5432
database: str
user: str
password: str
min_size: int = 10
max_size: int = 20
_pool: Optional[asyncpg.Pool] = None
async def get_pool(self) -> asyncpg.Pool:
"""Get or create the connection pool."""
if self._pool is None:
self._pool = await asyncpg.create_pool(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password,
min_size=self.min_size,
max_size=self.max_size
)
return self._pool
@asynccontextmanager
async def connection(self):
"""Context manager for getting a connection from the pool."""
pool = await self.get_pool()
async with pool.acquire() as connection:
yield connection
async def close_pool(self):
"""Close the connection pool."""
if self._pool is not None:
await self._pool.close()
self._pool = None
# Example task using the connection pool
@task
async def insert_data(data: dict):
# Get the connection pool block
pool_block = await PostgresConnectionPool.load("default")
async with pool_block.connection() as conn:
# Example insert query
await conn.execute(
"""
INSERT INTO your_table (column1, column2)
VALUES ($1, $2)
""",
data['column1'],
data['column2']
)
@flow
async def insert_data_flow(data: dict):
await insert_data(data)
To use this setup:
1. First, save the connection block configuration:
python
async def save_postgres_block():
pool_block = PostgresConnectionPool(
host="your_host",
database="your_db",
user="your_user",
password="your_password",
min_size=10, # Adjust based on your needs
max_size=20 # Adjust based on your needs
)
await pool_block.save(name="default")
2. Then you can use it in your flows:
python
data = {"column1": "value1", "column2": "value2"}
await insert_data_flow(data)
Some important notes about this implementation:
1. Connection Pool Sizing: Given your requirement of handling 1000 requests per minute, I've set default pool sizes of min_size=10 and max_size=20, but you should adjust these based on:
- Your specific workload patterns
- The capabilities of your EC2 t2.medium instance
- The nature of your database operations
2. Resource Management: The implementation includes proper connection pool management with:
- Lazy pool creation (only created when needed)
- Connection context manager for safe connection handling
- Pool cleanup method
3. Error Handling: You might want to add more robust error handling and retries depending on your specific needs.
4. Performance Considerations: For your high-throughput requirement (1000 requests/minute):
- Consider implementing batch operations where possible
- Monitor the pool size and adjust based on performance metrics
- Implement appropriate timeouts and error handling
You can enhance this basic implementation by adding:
- Connection timeouts
- Retry logic
- Connection pool metrics
- Health checks
- Transaction managementMarvin
07/23/2025, 7:10 PM