ASHIK
09/01/2025, 5:23 AMMarvin
09/01/2025, 5:23 AMMarvin
09/01/2025, 5:27 AMDELETE FROM log
WHERE timestamp < now() - interval '3 days';
2) Delete artifacts older than 3 days (if you don’t need them)
DELETE FROM artifact
WHERE created < now() - interval '3 days';
3) Delete old task runs in batches (repeat until 0 rows deleted)
WITH old_task_runs AS (
SELECT id
FROM task_run
WHERE COALESCE(end_time, created) < now() - interval '3 days'
LIMIT 10000
)
DELETE FROM task_run t
USING old_task_runs o
WHERE t.id = o.id;
4) Delete old flow runs in batches (repeat until 0 rows deleted)
WITH old_flow_runs AS (
SELECT id
FROM flow_run
WHERE COALESCE(end_time, created) < now() - interval '3 days'
LIMIT 10000
)
DELETE FROM flow_run f
USING old_flow_runs o
WHERE f.id = o.id;
Notes:
- Batch deletes avoid long locks. Run each DELETE repeatedly until it reports 0 rows.
- FK constraints in Prefect should handle dependent rows (states, etc.), but deleting in the order above reduces work and risk.
- After large deletes, run routine maintenance to reclaim space, e.g. VACUUM (ANALYZE)
or consider pg_repack
to reduce bloat with minimal locks.
- If you want this to run automatically, schedule these SQL statements with pg_cron.
If you can confirm you’re on self-hosted Postgres (and whether you want to remove logs, artifacts, runs — all older than 3 days), I can tailor the exact queries to your schema and usage.ASHIK
09/01/2025, 5:33 AMimport asyncio
from datetime import datetime, timedelta, timezone
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType, FlowRunFilterStartTime
from prefect.client.schemas.objects import StateType
@task
async def delete_old_flow_runs(
days_to_keep: int = 30,
batch_size: int = 100
):
"""Delete completed flow runs older than specified days."""
logger = get_run_logger()
async with get_client() as client:
cutoff = datetime.now(timezone.utc) - timedelta(days=days_to_keep)
# Create filter for old completed flow runs
# Note: Using start_time because created time filtering is not available
flow_run_filter = FlowRunFilter(
start_time=FlowRunFilterStartTime(before_=cutoff),
state=FlowRunFilterState(
type=FlowRunFilterStateType(
any_=[StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED]
)
)
)
# Get flow runs to delete
flow_runs = await client.read_flow_runs(
flow_run_filter=flow_run_filter,
limit=batch_size
)
deleted_total = 0
while flow_runs:
batch_deleted = 0
failed_deletes = []
# Delete each flow run through the API
for flow_run in flow_runs:
try:
await client.delete_flow_run(flow_run.id)
deleted_total += 1
batch_deleted += 1
except Exception as e:
logger.warning(f"Failed to delete flow run {flow_run.id}: {e}")
failed_deletes.append(flow_run.id)
# Rate limiting - adjust based on your API capacity
if batch_deleted % 10 == 0:
await asyncio.sleep(0.5)
<http://logger.info|logger.info>(f"Deleted {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
if failed_deletes:
logger.warning(f"Failed to delete {len(failed_deletes)} flow runs")
# Get next batch
flow_runs = await client.read_flow_runs(
flow_run_filter=flow_run_filter,
limit=batch_size
)
# Delay between batches to avoid overwhelming the API
await asyncio.sleep(1.0)
<http://logger.info|logger.info>(f"Retention complete. Total deleted: {deleted_total}")
@flow(name="database-retention")
async def retention_flow():
"""Run database retention tasks."""
await delete_old_flow_runs(
days_to_keep=30,
batch_size=100
)
will this cause server loadMarvin
09/01/2025, 5:33 AMMarvin
09/01/2025, 5:34 AMimport asyncio
from datetime import datetime, timedelta, timezone
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType, FlowRunFilterStartTime
)
from prefect.client.schemas.objects import StateType
@task
async def delete_old_flow_runs(
days_to_keep: int = 3,
batch_size: int = 50,
max_deletes: int = 2000, # safety cap per execution
):
"""Delete terminal flow runs older than specified days."""
logger = get_run_logger()
async with get_client() as client:
cutoff = datetime.now(timezone.utc) - timedelta(days=days_to_keep)
flow_run_filter = FlowRunFilter(
start_time=FlowRunFilterStartTime(before_=cutoff),
state=FlowRunFilterState(
type=FlowRunFilterStateType(
any_=[
StateType.COMPLETED,
StateType.FAILED,
StateType.CANCELLED,
StateType.CRASHED,
]
)
),
)
deleted_total = 0
while True:
if deleted_total >= max_deletes:
<http://logger.info|logger.info>(f"Hit max_deletes={max_deletes}; stopping this run")
break
flow_runs = await client.read_flow_runs(
flow_run_filter=flow_run_filter,
limit=min(batch_size, max_deletes - deleted_total),
)
if not flow_runs:
break
batch_deleted = 0
failed_deletes = []
for flow_run in flow_runs:
try:
await client.delete_flow_run(flow_run.id)
deleted_total += 1
batch_deleted += 1
except Exception as e:
logger.warning(f"Failed to delete flow run {flow_run.id}: {e}")
failed_deletes.append(flow_run.id)
# gentle throttle
if batch_deleted % 10 == 0:
await asyncio.sleep(0.5)
<http://logger.info|logger.info>(f"Deleted {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
if failed_deletes:
logger.warning(f"Failed to delete {len(failed_deletes)} flow runs")
await asyncio.sleep(1.0)
<http://logger.info|logger.info>(f"Retention complete. Total deleted: {deleted_total}")
@flow(name="database-retention")
async def retention_flow():
await delete_old_flow_runs(
days_to_keep=3,
batch_size=50,
max_deletes=2000,
)
For very large cleanups, DB-level pruning is faster and puts less API load:
- Logs older than 3 days:
DELETE FROM log
WHERE timestamp < now() - interval '3 days';
Marvin
09/01/2025, 5:34 AMDELETE FROM artifact
WHERE created < now() - interval '3 days';
More on maintenance: Database maintenance (Prefect 3)
Background discussion: Auto clean-up feature issue
If you’re self-hosting Postgres and want, I can share a full SQL-based retention routine that prunes logs, artifacts, task runs, and flow runs older than 3 days with batching to minimize locks.ASHIK
09/01/2025, 5:35 AMMarvin
09/01/2025, 5:35 AMMarvin
09/01/2025, 5:37 AM-- These help the queries below; skip if they already exist
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_log_timestamp ON log (timestamp);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_artifact_created ON artifact (created);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_flow_run_created ON flow_run (created);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_flow_run_end_time ON flow_run (end_time);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_task_run_flow_run_id ON task_run (flow_run_id);
2) Create a stored procedure to prune in batches
CREATE OR REPLACE PROCEDURE prefect_retention(p_days_to_keep int DEFAULT 3,
p_batch_size int DEFAULT 10000)
LANGUAGE plpgsql
AS $$
DECLARE
v_cutoff timestamptz := now() - make_interval(days => p_days_to_keep);
v_rows int;
BEGIN
RAISE NOTICE 'Prefect retention start, cutoff=%', v_cutoff;
-- 1) Logs older than cutoff
LOOP
WITH old AS (
SELECT id
FROM log
WHERE timestamp < v_cutoff
LIMIT p_batch_size
)
DELETE FROM log l
USING old o
WHERE l.id = o.id;
GET DIAGNOSTICS v_rows = ROW_COUNT;
EXIT WHEN v_rows = 0;
PERFORM pg_sleep(0.2);
END LOOP;
-- 2) Artifacts older than cutoff
LOOP
WITH old AS (
SELECT id
FROM artifact
WHERE created < v_cutoff
LIMIT p_batch_size
)
DELETE FROM artifact a
USING old o
WHERE a.id = o.id;
GET DIAGNOSTICS v_rows = ROW_COUNT;
EXIT WHEN v_rows = 0;
PERFORM pg_sleep(0.2);
END LOOP;
-- 3) Task runs for flow runs older than cutoff
LOOP
WITH old_tr AS (
SELECT tr.id
FROM task_run tr
JOIN flow_run fr ON fr.id = tr.flow_run_id
WHERE COALESCE(fr.end_time, fr.created) < v_cutoff
LIMIT p_batch_size
)
DELETE FROM task_run t
USING old_tr o
WHERE t.id = o.id;
GET DIAGNOSTICS v_rows = ROW_COUNT;
EXIT WHEN v_rows = 0;
PERFORM pg_sleep(0.2);
END LOOP;
-- 4) Flow runs older than cutoff
LOOP
WITH old_fr AS (
SELECT id
FROM flow_run
WHERE COALESCE(end_time, created) < v_cutoff
LIMIT p_batch_size
)
DELETE FROM flow_run f
USING old_fr o
WHERE f.id = o.id;
GET DIAGNOSTICS v_rows = ROW_COUNT;
EXIT WHEN v_rows = 0;
PERFORM pg_sleep(0.2);
END LOOP;
RAISE NOTICE 'Prefect retention finished';
END;
$$;
3) Run it (keeps last 3 days)
CALL prefect_retention(3, 10000);
4) Optional: schedule daily with pg_cron
-- Install pg_cron extension first if needed
-- CREATE EXTENSION IF NOT EXISTS pg_cron;
SELECT cron.schedule(
'prefect_retention_daily',
'15 02 * * *',
$$CALL prefect_retention(3, 10000);$$
);
Why this order?
- Deleting logs and artifacts first lightens later deletes.
- Then remove task runs tied to old flow runs.
- Finally delete the flow runs. If your FK constraints are ON DELETE CASCADE you could skip the explicit task_run step, but doing it explicitly reduces cascade work and long transactions.
If you prefer to stick with the API-based flow you posted: yes, it will load the server, but your batching/sleeps make it manageable. The DB‑level approach above is faster and reduces API overhead for large backfills.
Want me to tailor this for a different cutoff, schema, or stricter throttling?ASHIK
09/01/2025, 5:41 AMMarvin
09/01/2025, 5:41 AMMarvin
09/01/2025, 5:42 AMWITH old AS (
SELECT id
FROM log
WHERE timestamp < now() - interval '3 days'
ORDER BY id
LIMIT 10000
)
DELETE FROM log l
USING old o
WHERE l.id = o.id;
2) Delete artifacts older than 3 days
WITH old AS (
SELECT id
FROM artifact
WHERE created < now() - interval '3 days'
ORDER BY id
LIMIT 10000
)
DELETE FROM artifact a
USING old o
WHERE a.id = o.id;
3) Delete task runs tied to old flow runs (reduces cascade work)
WITH old AS (
SELECT tr.id
FROM task_run tr
JOIN flow_run fr ON fr.id = tr.flow_run_id
WHERE COALESCE(fr.end_time, fr.created) < now() - interval '3 days'
ORDER BY tr.id
LIMIT 10000
)
DELETE FROM task_run t
USING old o
WHERE t.id = o.id;
4) Delete flow runs older than 3 days
WITH old AS (
SELECT id
FROM flow_run
WHERE COALESCE(end_time, created) < now() - interval '3 days'
ORDER BY id
LIMIT 10000
)
DELETE FROM flow_run f
USING old o
WHERE f.id = o.id;
Optional post-cleanup maintenance (space/stats):
VACUUM (ANALYZE) log;
VACUUM (ANALYZE) artifact;
VACUUM (ANALYZE) task_run;
VACUUM (ANALYZE) flow_run;
Notes:
- Adjust the interval if needed (e.g., interval '7 days'
).
- Batch size (LIMIT 10000) is a balance; lower it if you see lock contention.
- Prefect’s FKs typically cascade states; the explicit task_run deletion helps keep transactions small.
- If you prefer, you can schedule these statements via pg_cron to run nightly.