<@ULVA73B9P> how to cleanup prefect database every...
# ask-marvin
a
@Marvin how to cleanup prefect database every 1 hour?
m
thought for 609.9 seconds
Quick clarifier to make sure I point you the right way: - Are you using Prefect Cloud or a self-hosted Prefect Server? - By “cleanup,” do you want to delete old flow/task runs (and their logs/artifacts), or completely wipe the database? Assuming self-hosted Prefect 3.x and you want to purge old runs/logs regularly: - There isn’t a built-in hourly DB cleanup/retention job. - Do NOT use
prefect server database reset
for routine cleanup — it drops and recreates all tables (destructive). Recommended: run a small “janitor” flow every hour that deletes old, terminal flow runs. Deleting flow runs will cascade-delete related task runs and logs. Example cleanup flow (keeps the last 24h; adjust as needed):
Copy code
from prefect import flow, get_run_logger
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FilterSet,
    FlowRunFilter,
    FlowRunFilterStartTime,
    FlowRunFilterStateType,
)
from prefect.states import StateType
import pendulum

@flow
async def cleanup_prefect(retention_hours: int = 24, batch_size: int = 200, max_batches: int = 10):
    logger = get_run_logger()
    cutoff = pendulum.now("UTC").subtract(hours=retention_hours)

    filters = FilterSet(
        flow_runs=FlowRunFilter(
            start_time=FlowRunFilterStartTime(before_=cutoff),
            state_type=FlowRunFilterStateType(
                any_=[StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED, StateType.CRASHED]
            ),
        )
    )

    deleted = 0
    async with get_client() as client:
        for _ in range(max_batches):
            runs = await client.read_flow_runs(limit=batch_size, filters=filters)
            if not runs:
                break
            for r in runs:
                await client.delete_flow_run(r.id)
                deleted += 1

    <http://logger.info|logger.info>(f"Deleted {deleted} flow runs older than {retention_hours}h")
Deploy it and schedule hourly: 1) Save as
cleanup.py
2) Create a deployment (replace WORK_POOL with your pool):
Copy code
prefect deploy ./cleanup.py:cleanup_prefect --name db-janitor --work-pool WORK_POOL
3) Add an hourly schedule (interval is seconds). Deployment name is
flow-name/deployment-name
, so:
Copy code
prefect deployment schedule create "cleanup_prefect/db-janitor" --interval 3600
Notes: - Start with a conservative retention window and batch sizes; increase once you see it working smoothly. - If your goal is PostgreSQL maintenance (VACUUM/ANALYZE), run those via your DB’s autovacuum or your own DB cron job; Prefect does not manage that for you. If you’re on Prefect Cloud or want a different retention window/criteria (e.g., keep last N runs per deployment), tell me your environment and preferences and I’ll tailor the script.