<@ULVA73B9P> how can i cleanup prefect db , i donm...
# ask-marvin
a
@Marvin how can i cleanup prefect db , i donmt wnat previosu data before last 3 days
m
thought for 216.2 seconds
Got it — you want to keep only the last 3 days of Prefect data. Quick clarification: - Are you self-hosting Prefect Server with Postgres? (If you’re on Prefect Cloud, you don’t manage the DB directly.) In Prefect 3.x there isn’t a built‑in retention policy or prune command yet. You’ll need to prune via your Postgres database (or schedule these queries via pg_cron). References: - Database maintenance (Prefect 3 docs) - “Auto clean-up feature” issue thread If you’re self‑hosting with Postgres, here’s a practical approach. Always back up first, test on a staging DB, and run in off‑peak hours. 1) Delete logs older than 3 days (fast win)
Copy code
DELETE FROM log
WHERE timestamp < now() - interval '3 days';
2) Delete artifacts older than 3 days (if you don’t need them)
Copy code
DELETE FROM artifact
WHERE created < now() - interval '3 days';
3) Delete old task runs in batches (repeat until 0 rows deleted)
Copy code
WITH old_task_runs AS (
  SELECT id
  FROM task_run
  WHERE COALESCE(end_time, created) < now() - interval '3 days'
  LIMIT 10000
)
DELETE FROM task_run t
USING old_task_runs o
WHERE t.id = o.id;
4) Delete old flow runs in batches (repeat until 0 rows deleted)
Copy code
WITH old_flow_runs AS (
  SELECT id
  FROM flow_run
  WHERE COALESCE(end_time, created) < now() - interval '3 days'
  LIMIT 10000
)
DELETE FROM flow_run f
USING old_flow_runs o
WHERE f.id = o.id;
Notes: - Batch deletes avoid long locks. Run each DELETE repeatedly until it reports 0 rows. - FK constraints in Prefect should handle dependent rows (states, etc.), but deleting in the order above reduces work and risk. - After large deletes, run routine maintenance to reclaim space, e.g.
VACUUM (ANALYZE)
or consider
pg_repack
to reduce bloat with minimal locks. - If you want this to run automatically, schedule these SQL statements with pg_cron. If you can confirm you’re on self-hosted Postgres (and whether you want to remove logs, artifacts, runs — all older than 3 days), I can tailor the exact queries to your schema and usage.
a
@Marvin
Copy code
import asyncio
from datetime import datetime, timedelta, timezone
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType, FlowRunFilterStartTime
from prefect.client.schemas.objects import StateType

@task
async def delete_old_flow_runs(
    days_to_keep: int = 30,
    batch_size: int = 100
):
    """Delete completed flow runs older than specified days."""
    logger = get_run_logger()
    
    async with get_client() as client:
        cutoff = datetime.now(timezone.utc) - timedelta(days=days_to_keep)
        
        # Create filter for old completed flow runs
        # Note: Using start_time because created time filtering is not available
        flow_run_filter = FlowRunFilter(
            start_time=FlowRunFilterStartTime(before_=cutoff),
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(
                    any_=[StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED]
                )
            )
        )
        
        # Get flow runs to delete
        flow_runs = await client.read_flow_runs(
            flow_run_filter=flow_run_filter,
            limit=batch_size
        )
        
        deleted_total = 0
        
        while flow_runs:
            batch_deleted = 0
            failed_deletes = []
            
            # Delete each flow run through the API
            for flow_run in flow_runs:
                try:
                    await client.delete_flow_run(flow_run.id)
                    deleted_total += 1
                    batch_deleted += 1
                except Exception as e:
                    logger.warning(f"Failed to delete flow run {flow_run.id}: {e}")
                    failed_deletes.append(flow_run.id)
                    
                # Rate limiting - adjust based on your API capacity
                if batch_deleted % 10 == 0:
                    await asyncio.sleep(0.5)
                    
            <http://logger.info|logger.info>(f"Deleted {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
            if failed_deletes:
                logger.warning(f"Failed to delete {len(failed_deletes)} flow runs")
            
            # Get next batch
            flow_runs = await client.read_flow_runs(
                flow_run_filter=flow_run_filter,
                limit=batch_size
            )
            
            # Delay between batches to avoid overwhelming the API
            await asyncio.sleep(1.0)
        
        <http://logger.info|logger.info>(f"Retention complete. Total deleted: {deleted_total}")

@flow(name="database-retention")
async def retention_flow():
    """Run database retention tasks."""
    await delete_old_flow_runs(
        days_to_keep=30,
        batch_size=100
    )
will this cause server load
m
thought for 71.0 seconds
Short answer: yes, any bulk deletion via the API will create load on your Prefect server and database. However, with conservative batching and sleeps (like you have), it’s usually safe—especially if you run it off‑peak. What your flow does: - Each batch makes 1 read request + up to N delete requests (N=batch_size). Each delete cascades work in the DB (removing run states, task runs, etc.), which can be heavy. - Repeating batches until none remain is fine and avoids offset pitfalls. - Your sleeps help throttle load and reduce lock contention. Tips to keep load manageable: - Run during off‑peak hours. - Keep batch_size small (25–100) and increase the sleeps if you see DB pressure. - Add a max_deletes guard so a single run doesn’t do “everything” at once. - Include CRASHED in terminal states so you don’t leave behind old crashed runs. - Consider pruning logs separately at the DB level; flow run deletion may not remove all logs depending on FK behavior. If you want to keep only the last 3 days, set days_to_keep=3. Here’s a slightly hardened version of your task with a max_deletes cap and CRASHED included:
Copy code
import asyncio
from datetime import datetime, timedelta, timezone
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType, FlowRunFilterStartTime
)
from prefect.client.schemas.objects import StateType

@task
async def delete_old_flow_runs(
    days_to_keep: int = 3,
    batch_size: int = 50,
    max_deletes: int = 2000,  # safety cap per execution
):
    """Delete terminal flow runs older than specified days."""
    logger = get_run_logger()

    async with get_client() as client:
        cutoff = datetime.now(timezone.utc) - timedelta(days=days_to_keep)

        flow_run_filter = FlowRunFilter(
            start_time=FlowRunFilterStartTime(before_=cutoff),
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(
                    any_=[
                        StateType.COMPLETED,
                        StateType.FAILED,
                        StateType.CANCELLED,
                        StateType.CRASHED,
                    ]
                )
            ),
        )

        deleted_total = 0

        while True:
            if deleted_total >= max_deletes:
                <http://logger.info|logger.info>(f"Hit max_deletes={max_deletes}; stopping this run")
                break

            flow_runs = await client.read_flow_runs(
                flow_run_filter=flow_run_filter,
                limit=min(batch_size, max_deletes - deleted_total),
            )
            if not flow_runs:
                break

            batch_deleted = 0
            failed_deletes = []

            for flow_run in flow_runs:
                try:
                    await client.delete_flow_run(flow_run.id)
                    deleted_total += 1
                    batch_deleted += 1
                except Exception as e:
                    logger.warning(f"Failed to delete flow run {flow_run.id}: {e}")
                    failed_deletes.append(flow_run.id)

                # gentle throttle
                if batch_deleted % 10 == 0:
                    await asyncio.sleep(0.5)

            <http://logger.info|logger.info>(f"Deleted {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
            if failed_deletes:
                logger.warning(f"Failed to delete {len(failed_deletes)} flow runs")

            await asyncio.sleep(1.0)

        <http://logger.info|logger.info>(f"Retention complete. Total deleted: {deleted_total}")

@flow(name="database-retention")
async def retention_flow():
    await delete_old_flow_runs(
        days_to_keep=3,
        batch_size=50,
        max_deletes=2000,
    )
For very large cleanups, DB-level pruning is faster and puts less API load: - Logs older than 3 days:
Copy code
DELETE FROM log
WHERE timestamp < now() - interval '3 days';
- Artifacts older than 3 days:
Copy code
DELETE FROM artifact
WHERE created < now() - interval '3 days';
More on maintenance: Database maintenance (Prefect 3) Background discussion: Auto clean-up feature issue If you’re self-hosting Postgres and want, I can share a full SQL-based retention routine that prunes logs, artifacts, task runs, and flow runs older than 3 days with batching to minimize locks.
a
@Marvin i am self hosting postgresql
m
thought for 69.4 seconds
Perfect — since you’re self‑hosting Postgres, here’s a safe, DB‑level retention routine to keep only the last 3 days. It deletes in small batches to minimize locks and server load. Important: - Back up your DB first. - Test on staging if possible. - Run during off‑peak hours. - After large deletes, let autovacuum run or consider pg_repack for bloat. - Reference: Database maintenance (Prefect 3) 1) Optional: add helpful indexes (run outside a transaction; CONCURRENTLY avoids long locks)
Copy code
-- These help the queries below; skip if they already exist
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_log_timestamp ON log (timestamp);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_artifact_created ON artifact (created);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_flow_run_created ON flow_run (created);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_flow_run_end_time ON flow_run (end_time);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_task_run_flow_run_id ON task_run (flow_run_id);
2) Create a stored procedure to prune in batches
Copy code
CREATE OR REPLACE PROCEDURE prefect_retention(p_days_to_keep int DEFAULT 3,
                                              p_batch_size   int DEFAULT 10000)
LANGUAGE plpgsql
AS $$
DECLARE
  v_cutoff timestamptz := now() - make_interval(days => p_days_to_keep);
  v_rows   int;
BEGIN
  RAISE NOTICE 'Prefect retention start, cutoff=%', v_cutoff;

  -- 1) Logs older than cutoff
  LOOP
    WITH old AS (
      SELECT id
      FROM log
      WHERE timestamp < v_cutoff
      LIMIT p_batch_size
    )
    DELETE FROM log l
    USING old o
    WHERE l.id = o.id;
    GET DIAGNOSTICS v_rows = ROW_COUNT;
    EXIT WHEN v_rows = 0;
    PERFORM pg_sleep(0.2);
  END LOOP;

  -- 2) Artifacts older than cutoff
  LOOP
    WITH old AS (
      SELECT id
      FROM artifact
      WHERE created < v_cutoff
      LIMIT p_batch_size
    )
    DELETE FROM artifact a
    USING old o
    WHERE a.id = o.id;
    GET DIAGNOSTICS v_rows = ROW_COUNT;
    EXIT WHEN v_rows = 0;
    PERFORM pg_sleep(0.2);
  END LOOP;

  -- 3) Task runs for flow runs older than cutoff
  LOOP
    WITH old_tr AS (
      SELECT tr.id
      FROM task_run tr
      JOIN flow_run fr ON fr.id = tr.flow_run_id
      WHERE COALESCE(fr.end_time, fr.created) < v_cutoff
      LIMIT p_batch_size
    )
    DELETE FROM task_run t
    USING old_tr o
    WHERE t.id = o.id;
    GET DIAGNOSTICS v_rows = ROW_COUNT;
    EXIT WHEN v_rows = 0;
    PERFORM pg_sleep(0.2);
  END LOOP;

  -- 4) Flow runs older than cutoff
  LOOP
    WITH old_fr AS (
      SELECT id
      FROM flow_run
      WHERE COALESCE(end_time, created) < v_cutoff
      LIMIT p_batch_size
    )
    DELETE FROM flow_run f
    USING old_fr o
    WHERE f.id = o.id;
    GET DIAGNOSTICS v_rows = ROW_COUNT;
    EXIT WHEN v_rows = 0;
    PERFORM pg_sleep(0.2);
  END LOOP;

  RAISE NOTICE 'Prefect retention finished';
END;
$$;
3) Run it (keeps last 3 days)
Copy code
CALL prefect_retention(3, 10000);
4) Optional: schedule daily with pg_cron
Copy code
-- Install pg_cron extension first if needed
-- CREATE EXTENSION IF NOT EXISTS pg_cron;

SELECT cron.schedule(
  'prefect_retention_daily',
  '15 02 * * *',
  $$CALL prefect_retention(3, 10000);$$
);
Why this order? - Deleting logs and artifacts first lightens later deletes. - Then remove task runs tied to old flow runs. - Finally delete the flow runs. If your FK constraints are ON DELETE CASCADE you could skip the explicit task_run step, but doing it explicitly reduces cascade work and long transactions. If you prefer to stick with the API-based flow you posted: yes, it will load the server, but your batching/sleeps make it manageable. The DB‑level approach above is faster and reduces API overhead for large backfills. Want me to tailor this for a different cutoff, schema, or stricter throttling?
a
@Marvin can you give a query which is not procedure , i want to try it
m
thought for 44.9 seconds
Absolutely — here are standalone, non-procedure SQL statements you can run. Run each one repeatedly until it reports “0 rows affected” to prune everything older than 3 days without long transactions. Tip: Run during off‑peak hours and back up first. Reference: Database maintenance (Prefect 3) 1) Delete logs older than 3 days (batch of 10k)
Copy code
WITH old AS (
  SELECT id
  FROM log
  WHERE timestamp < now() - interval '3 days'
  ORDER BY id
  LIMIT 10000
)
DELETE FROM log l
USING old o
WHERE l.id = o.id;
2) Delete artifacts older than 3 days
Copy code
WITH old AS (
  SELECT id
  FROM artifact
  WHERE created < now() - interval '3 days'
  ORDER BY id
  LIMIT 10000
)
DELETE FROM artifact a
USING old o
WHERE a.id = o.id;
3) Delete task runs tied to old flow runs (reduces cascade work)
Copy code
WITH old AS (
  SELECT tr.id
  FROM task_run tr
  JOIN flow_run fr ON fr.id = tr.flow_run_id
  WHERE COALESCE(fr.end_time, fr.created) < now() - interval '3 days'
  ORDER BY tr.id
  LIMIT 10000
)
DELETE FROM task_run t
USING old o
WHERE t.id = o.id;
4) Delete flow runs older than 3 days
Copy code
WITH old AS (
  SELECT id
  FROM flow_run
  WHERE COALESCE(end_time, created) < now() - interval '3 days'
  ORDER BY id
  LIMIT 10000
)
DELETE FROM flow_run f
USING old o
WHERE f.id = o.id;
Optional post-cleanup maintenance (space/stats):
Copy code
VACUUM (ANALYZE) log;
VACUUM (ANALYZE) artifact;
VACUUM (ANALYZE) task_run;
VACUUM (ANALYZE) flow_run;
Notes: - Adjust the interval if needed (e.g.,
interval '7 days'
). - Batch size (LIMIT 10000) is a balance; lower it if you see lock contention. - Prefect’s FKs typically cascade states; the explicit task_run deletion helps keep transactions small. - If you prefer, you can schedule these statements via pg_cron to run nightly.