Michael Michael
07/09/2025, 12:42 PMMarvin
07/11/2025, 10:13 AMMarvin
07/11/2025, 10:14 AMpython
from datetime import datetime, timedelta
from prefect import flow
from prefect.server.schemas.core import FlowRun
from prefect.server.database.interface import PrefectDBInterface
from sqlalchemy import text
@flow(name="database-cleanup")
async def cleanup_old_data():
"""Clean up flow runs and related data older than 14 days."""
cutoff_date = datetime.utcnow() - timedelta(days=14)
async with PrefectDBInterface() as db:
# Delete old flow runs and cascading data
query = text("""
DELETE FROM flow_run
WHERE created < :cutoff_date
AND state_type = 'COMPLETED'
""")
result = await db.execute(query, parameters={"cutoff_date": cutoff_date})
# Clean up orphaned logs
log_cleanup = text("""
DELETE FROM log
WHERE timestamp < :cutoff_date
AND flow_run_id NOT IN (SELECT id FROM flow_run)
""")
log_result = await db.execute(log_cleanup, parameters={"cutoff_date": cutoff_date})
return {"message": "Cleanup completed successfully"}
Key points about this solution:
1. Safety First: The cleanup only targets:
- Flow runs older than 14 days
- Completed flow runs (preserving failed runs that might need investigation)
- Related logs and states (through database cascading)
- Does not touch deployments, flow definitions, or other critical metadata
2. Relevant Tables:
- `flow_run`: Contains the main flow run records
- `flow_run_state`: Flow run state history (usually cascades with flow_run deletions)
- `log`: Contains all logging information
- `deployment`: (untouched by this cleanup)
- `flow`: (untouched by this cleanup)
3. Implementation Tips:
- Schedule this flow to run daily during off-peak hours
- Consider adding monitoring and alerts for cleanup job success/failure
- Add logging to track the number of records deleted
- Consider implementing a dry-run mode for testing
4. Deployment Example:
python
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
cleanup_old_data,
name="database-cleanup-daily",
schedule={"cron": "0 0 * * *"} # Run at midnight daily
)
5. Additional Considerations:
- Before running large cleanup operations, always backup your database
- Consider implementing the cleanup in batches if dealing with large volumes of data
- Monitor database performance during cleanup operations
- Add appropriate indexes to improve cleanup performance
For monitoring the database size and cleanup effectiveness, you might want to add this query to your cleanup flow:
sql
SELECT schemaname, relname, pg_size_pretty(pg_total_relation_size(schemaname || '.' || relname))
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname || '.' || relname) DESC;
While Prefect Cloud handles this automatically, your self-hosted approach gives you more control over retention policies and cleanup strategies. The solution above provides a similar level of maintenance while allowing you to customize it to your specific needs.
Would you like me to provide more details about any particular aspect of this solution, such as monitoring strategies or batch processing implementations?