Aloha Prefect Team and <@ULVA73B9P> :v::skin-tone-...
# ask-community
m
Aloha Prefect Team and @Marvin ✌🏼 We are running a self-hosted Prefect instance backed by a Postgres database. Over time, we’ve noticed that our database is gradually filling up since, by default, there is no automatic cleanup of logs or historical run data. In our case, we only need to retain data for a maximum of 14 days and would like to delete anything older. Of course, without affecting the most recent and critical metadata (such as deployments, flow definitions, etc.). From what we understand, Prefect Cloud handles this automatically. Is there a recommended way to achieve similar cleanup in a self-hosted or open-source setup? Has anyone here already implemented such a strategy or have suggestions on best practices? Additionally, which tables in the Prefect database schema are most relevant for safely cleaning up old run-related data? Thanks 🙂
m
thought for 49.8 seconds
Let me provide you with a comprehensive solution to your database cleanup needs. The most straightforward approach would be to create a Prefect flow that handles the cleanup process. Here's a practical solution that addresses your 14-day retention requirement:
Copy code
python
from datetime import datetime, timedelta
from prefect import flow
from prefect.server.schemas.core import FlowRun
from prefect.server.database.interface import PrefectDBInterface
from sqlalchemy import text

@flow(name="database-cleanup")
async def cleanup_old_data():
    """Clean up flow runs and related data older than 14 days."""
    cutoff_date = datetime.utcnow() - timedelta(days=14)
    
    async with PrefectDBInterface() as db:
        # Delete old flow runs and cascading data
        query = text("""
            DELETE FROM flow_run
            WHERE created < :cutoff_date
            AND state_type = 'COMPLETED'
        """)
        
        result = await db.execute(query, parameters={"cutoff_date": cutoff_date})
        
        # Clean up orphaned logs
        log_cleanup = text("""
            DELETE FROM log
            WHERE timestamp < :cutoff_date
            AND flow_run_id NOT IN (SELECT id FROM flow_run)
        """)
        
        log_result = await db.execute(log_cleanup, parameters={"cutoff_date": cutoff_date})
    
    return {"message": "Cleanup completed successfully"}
Key points about this solution: 1. Safety First: The cleanup only targets: - Flow runs older than 14 days - Completed flow runs (preserving failed runs that might need investigation) - Related logs and states (through database cascading) - Does not touch deployments, flow definitions, or other critical metadata 2. Relevant Tables: - `flow_run`: Contains the main flow run records - `flow_run_state`: Flow run state history (usually cascades with flow_run deletions) - `log`: Contains all logging information - `deployment`: (untouched by this cleanup) - `flow`: (untouched by this cleanup) 3. Implementation Tips: - Schedule this flow to run daily during off-peak hours - Consider adding monitoring and alerts for cleanup job success/failure - Add logging to track the number of records deleted - Consider implementing a dry-run mode for testing 4. Deployment Example:
Copy code
python
from prefect.deployments import Deployment

deployment = Deployment.build_from_flow(
    cleanup_old_data,
    name="database-cleanup-daily",
    schedule={"cron": "0 0 * * *"}  # Run at midnight daily
)
5. Additional Considerations: - Before running large cleanup operations, always backup your database - Consider implementing the cleanup in batches if dealing with large volumes of data - Monitor database performance during cleanup operations - Add appropriate indexes to improve cleanup performance For monitoring the database size and cleanup effectiveness, you might want to add this query to your cleanup flow:
Copy code
sql
SELECT schemaname, relname, pg_size_pretty(pg_total_relation_size(schemaname || '.' || relname))
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname || '.' || relname) DESC;
While Prefect Cloud handles this automatically, your self-hosted approach gives you more control over retention policies and cleanup strategies. The solution above provides a similar level of maintenance while allowing you to customize it to your specific needs. Would you like me to provide more details about any particular aspect of this solution, such as monitoring strategies or batch processing implementations?