ESmith
06/30/2025, 8:14 AMMarvin
06/30/2025, 8:14 AMMarvin
06/30/2025, 8:15 AMpg_trgm
extension for enhanced text search capabilities
If you're planning to use RDS, you'll need to:
1. Set up the database with the pg_trgm
extension
2. Configure Prefect using the PREFECT_API_DATABASE_CONNECTION_URL
environment variable with your RDS connection string
3. Ensure your database user has the necessary permissions
For more details on setting up and maintaining a production PostgreSQL database with Prefect, you can check out:
- Database Partitioning Guide
- API Reference for Database Models
- Database Utilities DocumentationESmith
06/30/2025, 8:23 AMMarvin
06/30/2025, 8:23 AMMarvin
06/30/2025, 8:24 AMpython
from prefect import get_client
from datetime import datetime, timedelta
async def cleanup_old_runs():
client = get_client()
# Get flow runs older than X days
old_runs = await client.read_flow_runs(
flow_filter={"created_before": datetime.now() - timedelta(days=30)}
)
# Delete them
for run in old_runs:
await client.delete_flow_run(run.id)
2. Automated Cleanup Flow
Create a Prefect flow that runs periodically to clean up old runs:
python
from prefect import flow
from datetime import timedelta
@flow(retries=3)
async def cleanup_flow():
client = get_client()
# Find and delete runs older than 30 days that are in a terminal state
old_runs = await client.read_flow_runs(
flow_filter={
"created_before": datetime.now() - timedelta(days=30),
"state": {"type": {"any_": ["COMPLETED", "FAILED", "CANCELLED"]}}
}
)
for run in old_runs:
await client.delete_flow_run(run.id)
# Deploy this flow to run daily
3. Database Maintenance
- Consider implementing database partitioning for better performance
- Regular vacuum operations if using PostgreSQL
- Monitor and analyze query performance
4. Prevention Strategies
- Set appropriate retention policies based on your needs
- Only store necessary data in flow run logs and results
- Use external storage for large artifacts instead of storing them directly in the database
If you're seeing unusual database growth, you might want to:
1. Analyze which tables are growing the fastest
2. Check if you're storing large amounts of data in flow run results
3. Verify if you have any flows that are running more frequently than intended
4. Look for any flows that might be generating excessive logs or metadata
Would you like me to provide more specific guidance for any of these approaches? Also, it would be helpful to know how quickly your database is growing and what size it has reached.ESmith
06/30/2025, 8:43 AMMarvin
06/30/2025, 8:44 AMMarvin
06/30/2025, 8:45 AMget_run_logger()
in flows/tasks which provides contextual logging
- Logs can be redirected to external logging systems (like CloudWatch, Datadog, etc.)
- You can customize logging behavior using Python's standard logging configuration
Here's an example of proper logging in a flow:
python
from prefect import flow, task
from prefect.logging import get_run_logger
@task
def my_task():
logger = get_run_logger()
<http://logger.info|logger.info>("This goes to stdout by default") # Not stored in DB
@flow
def my_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting flow") # Not stored in DB
my_task()
If your database is growing unexpectedly, it's more likely due to:
1. Accumulated flow run history
2. Large result payloads being stored
3. Metadata from numerous task/flow runs
To reduce database size:
1. Implement regular cleanup of old flow runs
2. Use external result storage for large outputs
3. Be mindful of how much data you're returning from your tasks/flows
Would you like me to provide more specific guidance about setting up external logging or result storage to reduce database load?ESmith
06/30/2025, 8:59 AMMarvin
06/30/2025, 9:00 AMMarvin
06/30/2025, 9:01 AMpublic.log
table in the database. This is a common source of database growth, and you can manage it in several ways:
1. Direct Database Cleanup
You can safely delete old logs with a SQL query like:
sql
DELETE FROM public.log
WHERE timestamp < NOW() - INTERVAL '30 days';
2. Configure Log Retention
In Prefect 3.x, you can set the log retention period using the PREFECT_LOGGING_DB_RETENTION_DAYS
environment variable. For example:
bash
export PREFECT_LOGGING_DB_RETENTION_DAYS=30
3. Reduce Logging Volume
- Only log what's necessary in your flows
- Use appropriate log levels (ERROR, WARNING, INFO) to control verbosity
- Consider redirecting DEBUG level logs elsewhere
4. External Logging
You can redirect logs to external systems like CloudWatch or ELK stack to reduce database load. This requires custom configuration of the logging handlers.
If you're dealing with a large log table right now, you might want to:
1. First backup your database
2. Delete old logs using the SQL above
3. Run VACUUM on the database to reclaim space
4. Set up a retention policy going forward
Would you like specific guidance on implementing any of these solutions? Also, it would be helpful to know how large your log table has grown to help determine the best cleanup strategy.ESmith
06/30/2025, 9:05 AMMarvin
06/30/2025, 9:05 AMMarvin
06/30/2025, 9:05 AMPREFECT_LOGGING_DB_RETENTION_DAYS
and I was incorrect to suggest it exists. This appears to be misinformation on my part.
For log management in Prefect, you'll need to handle cleanup manually through database operations or implement your own cleanup solution. The most reliable approach would be to:
1. Create a cleanup script using SQL to remove old logs
2. Schedule it to run periodically
Would you like me to provide guidance on implementing a proper cleanup solution for the logs table? Again, I apologize for the confusion about the environment variable - I should have verified its existence before suggesting it.Mattia Sappa
06/30/2025, 9:36 AMESmith
06/30/2025, 10:33 AM