<@ULVA73B9P> is there some way to set the data ret...
# ask-marvin
j
@Marvin is there some way to set the data retention policy of flow run records in the database? I found the
PREFECT_EVENTS_RETENTION_PERIOD
setting, but it seems it's only running if
PREFECT_EXPERIMENTAL_EVENTS
and even in that case I don't know what it's actually is supposed to do. We are finding that with many flows, our database is getting quite full, especially if the flow run logs are big. So, is there some setting which would make self-hosted Prefect server delete old flow runs from the database?
m
You want to delete old flow records automatically and not just logs? Let me check Prefect's documentation to see if there's a setting for data retention policies for flow run records specifically. Before I proceed, could you let me know which version of Prefect you are using?
j
Yes, I want to delete from the database records of all flow runs older than some configured age. we are on prefect 2.20.0
@Marvin Yes, I want to delete from the database records of all flow runs older than some configured age. we are on prefect 2.20.0
m
The flow run retention policy feature you're asking about is only applicable for Prefect Cloud and cannot be configured in a self-hosted server. There’s no built-in setting in Prefect 2.x that automatically deletes old flow run records from the database for self-hosted setups. However, you can achieve this by manually deleting old flow runs using the Prefect client. Below is a basic outline of how you can implement this: 1. List the historical flow runs that have reached a terminal state beyond a certain age. 2. Delete those runs. Here’s an example code snippet to help you get started:
Copy code
python
from prefect import get_client
from datetime import datetime, timedelta
import asyncio

async def delete_old_flow_runs(days_old: int):
    async with get_client() as client:
        cutoff_date = datetime.utcnow() - timedelta(days=days_old)
        flow_runs = await client.read_flow_runs(
            filter=dict(
                state=dict(type=["Completed", "Failed", "Canceled"]),
                end_time=dict(before=cutoff_date)
            )
        )
        for flow_run in flow_runs:
            print(f"Deleting flow run {flow_run.id}")
            await client.delete_flow_run(flow_run.id)

if __name__ == "__main__":
    asyncio.run(delete_old_flow_runs(30))  # Replace 30 with the number of days
Links to confirmed info: - Flow Run Retention Policy in Prefect Cloud - GitHub Issue on Removing Flow Run Records Hope this helps!