ASHIK
08/25/2025, 8:50 AMMarvin
08/25/2025, 8:50 AMMarvin
08/25/2025, 8:54 AMPrefectClient
. Set DELETE_MODE
to "older"
(default) or "within"
if you want just the last week. Run it where your Prefect environment variables are set (e.g., PREFECT_API_URL
and PREFECT_API_KEY
for Cloud).
import asyncio
from datetime import datetime, timedelta, timezone
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FilterSet,
FlowRunFilter,
FlowRunFilterStartTime,
)
# ---- Configuration ----
DAYS = 7
DELETE_MODE = "older" # "older" -> delete runs older than N days
# "within" -> delete runs that occurred in the last N days
DRY_RUN = False # True => just print what would be deleted
PAGE_SIZE = 200 # tune for your workspace size
# -----------------------
def _time_window(days: int, mode: str):
now = datetime.now(timezone.utc)
start = now - timedelta(days=days)
if mode == "older":
# Everything strictly before 'start'
return None, start
elif mode == "within":
# Anything between 'start' and 'now'
return start, now
else:
raise ValueError("DELETE_MODE must be 'older' or 'within'")
async def main():
after, before = _time_window(DAYS, DELETE_MODE)
# Build time filter
time_filter = FlowRunFilterStartTime(
after=after if after else None,
before=before if before else None,
)
filters = FilterSet(flow_runs=FlowRunFilter(start_time=time_filter))
async with get_client() as client:
# Collect IDs first (avoid paging issues when deleting)
flow_run_ids = []
offset = 0
while True:
runs = await client.read_flow_runs(
limit=PAGE_SIZE,
offset=offset,
filters=filters,
)
if not runs:
break
flow_run_ids.extend([r.id for r in runs])
offset += PAGE_SIZE
print(f"Found {len(flow_run_ids)} flow runs to delete (mode={DELETE_MODE}, days={DAYS}).")
if DRY_RUN or not flow_run_ids:
print("DRY_RUN is True or no runs found; exiting without deletion.")
return
# Delete runs (deleting a flow run removes its related task runs/states/logs)
deleted = 0
for fr_id in flow_run_ids:
try:
await client.delete_flow_run(fr_id)
deleted += 1
except Exception as exc:
print(f"Failed to delete flow run {fr_id}: {exc}")
print(f"Deleted {deleted} flow runs.")
if __name__ == "__main__":
asyncio.run(main())
Notes:
- For Prefect Cloud, ensure PREFECT_API_URL
and PREFECT_API_KEY
are set in your environment.
- Deleting a flow run will remove related task runs/states/logs. If you prefer to only purge logs while retaining runs, say the word and I’ll adjust the script.
References:
- Prefect 3.x Python client API
- Flow run API reference
If you confirm your exact target (older vs within last 7 days; Cloud vs Server; delete runs vs just logs/artifacts), I can tailor this precisely for you.ASHIK
08/25/2025, 8:55 AMASHIK
08/25/2025, 8:55 AMMarvin
08/25/2025, 8:55 AMMarvin
08/25/2025, 8:58 AM-- ============
-- Prefect DB cleanup (Postgres)
-- Deletes flow runs and related data based on a time window
-- IMPORTANT: Adjust table names if your schema differs (see comments)
-- ============
-- Common names by Prefect version:
-- flow_runs or flow_run
-- task_runs or task_run
-- flow_run_states or flow_run_state
-- task_run_states or task_run_state
-- log or logs
-- artifacts or artifact
-- events or event
BEGIN;
-- Configure your window here:
-- 1) Delete runs OLDER than 7 days:
WITH target_flow_runs AS (
SELECT id
FROM flow_runs -- adjust if your table is named flow_run
WHERE start_time < now() - interval '7 days'
)
-- 2) Or, delete runs WITHIN the last 7 days (use this instead of the CTE above):
-- WITH target_flow_runs AS (
-- SELECT id
-- FROM flow_runs
-- WHERE start_time >= now() - interval '7 days' AND start_time <= now()
-- )
-- Preview counts before deleting
SELECT count(*) AS flow_runs_to_delete FROM target_flow_runs;
SELECT
(SELECT count(*) FROM task_runs tr WHERE tr.flow_run_id IN (SELECT id FROM target_flow_runs)) AS task_runs,
(SELECT count(*) FROM flow_run_states frs WHERE frs.flow_run_id IN (SELECT id FROM target_flow_runs)) AS flow_run_states,
(SELECT count(*) FROM task_run_states trs WHERE trs.task_run_id IN (
SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs)
)) AS task_run_states,
(SELECT count(*) FROM log l WHERE l.flow_run_id IN (SELECT id FROM target_flow_runs)
OR l.task_run_id IN (SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs))) AS logs,
(SELECT count(*) FROM artifacts a WHERE a.flow_run_id IN (SELECT id FROM target_flow_runs)
OR a.task_run_id IN (SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs))) AS artifacts,
(SELECT count(*) FROM events e WHERE e.flow_run_id IN (SELECT id FROM target_flow_runs)) AS events;
-- If the counts look correct, uncomment deletes IN THIS ORDER.
-- NOTE: If your tables are singular/plural, update names accordingly.
-- DELETE FROM task_run_states
-- WHERE task_run_id IN (
-- SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs)
-- );
-- DELETE FROM flow_run_states
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs);
-- DELETE FROM log
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs)
-- OR task_run_id IN (SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs));
-- DELETE FROM artifacts
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs)
-- OR task_run_id IN (SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs));
-- Optional: if you store events tied to flow runs
-- DELETE FROM events
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs);
-- DELETE FROM task_runs
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs);
-- DELETE FROM flow_runs
-- WHERE id IN (SELECT id FROM target_flow_runs);
-- When satisfied, COMMIT; otherwise ROLLBACK;
-- COMMIT;
-- ROLLBACK;
Marvin
08/25/2025, 8:58 AMSELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename ILIKE ANY (ARRAY['%flow_run%','%task_run%','%state%','%log%','%artifact%','%event%'])
ORDER BY tablename;
Notes:
- If your DB has foreign keys with ON DELETE CASCADE configured, you could delete from flow_runs and let Postgres cascade the rest. However, not all installations have cascade set for all relations, so the explicit-order deletes above are safer.
- After large deletes, consider running VACUUM (or pg_repack) during a maintenance window to reclaim space.
If you can confirm:
- Cloud vs Server
- “older than 7 days” vs “within last 7 days”
- The exact table names from your DB
…I’ll send a pinpointed script for your environment.
References:
- Prefect 3.x Python client API (if you prefer API-based cleanup instead of direct SQL)ASHIK
08/25/2025, 9:41 AMMarvin
08/25/2025, 9:41 AMMarvin
08/25/2025, 9:42 AM-- Lists FKs and their actions for the public schema
SELECT
conrelid::regclass AS child_table,
conname AS constraint_name,
pg_get_constraintdef(c.oid) AS definition
FROM pg_constraint c
JOIN pg_namespace n ON n.oid = c.connamespace
WHERE c.contype = 'f'
AND n.nspname = 'public'
ORDER BY child_table, constraint_name;
2) Add or change FKs to ON DELETE CASCADE
Run inside a maintenance window; this will lock the child tables while altering constraints. Adjust table/column names to match your schema (singular vs plural). Common relationships in Prefect:
- flow_runs -> task_runs
- flow_runs -> flow_run_states
- task_runs -> task_run_states
- flow_runs/task_runs -> log
- flow_runs/task_runs -> artifacts
- flow_runs -> events (if present)
Example DDL (edit names to match your DB):
BEGIN;
-- task_runs -> flow_runs
ALTER TABLE task_runs DROP CONSTRAINT IF EXISTS task_runs_flow_run_id_fkey;
ALTER TABLE task_runs
ADD CONSTRAINT task_runs_flow_run_id_fkey
FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;
-- flow_run_states -> flow_runs
ALTER TABLE flow_run_states DROP CONSTRAINT IF EXISTS flow_run_states_flow_run_id_fkey;
ALTER TABLE flow_run_states
ADD CONSTRAINT flow_run_states_flow_run_id_fkey
FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;
-- task_run_states -> task_runs
ALTER TABLE task_run_states DROP CONSTRAINT IF EXISTS task_run_states_task_run_id_fkey;
ALTER TABLE task_run_states
ADD CONSTRAINT task_run_states_task_run_id_fkey
FOREIGN KEY (task_run_id) REFERENCES task_runs(id) ON DELETE CASCADE;
-- log -> flow_runs (if column exists and you want logs removed with flow runs)
ALTER TABLE log DROP CONSTRAINT IF EXISTS log_flow_run_id_fkey;
ALTER TABLE log
ADD CONSTRAINT log_flow_run_id_fkey
FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;
-- log -> task_runs (if column exists)
ALTER TABLE log DROP CONSTRAINT IF EXISTS log_task_run_id_fkey;
ALTER TABLE log
ADD CONSTRAINT log_task_run_id_fkey
FOREIGN KEY (task_run_id) REFERENCES task_runs(id) ON DELETE CASCADE;
-- artifacts -> flow_runs
ALTER TABLE artifacts DROP CONSTRAINT IF EXISTS artifacts_flow_run_id_fkey;
ALTER TABLE artifacts
ADD CONSTRAINT artifacts_flow_run_id_fkey
FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;
-- artifacts -> task_runs
ALTER TABLE artifacts DROP CONSTRAINT IF EXISTS artifacts_task_run_id_fkey;
ALTER TABLE artifacts
ADD CONSTRAINT artifacts_task_run_id_fkey
FOREIGN KEY (task_run_id) REFERENCES task_runs(id) ON DELETE CASCADE;
-- events -> flow_runs (if present)
ALTER TABLE events DROP CONSTRAINT IF EXISTS events_flow_run_id_fkey;
ALTER TABLE events
ADD CONSTRAINT events_flow_run_id_fkey
FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;
COMMIT;
3) Delete from the parent; children will cascade automatically
- Older than 7 days:
DELETE FROM flow_runs
WHERE start_time < now() - interval '7 days';
- Within the last 7 days:
DELETE FROM flow_runs
WHERE start_time >= now() - interval '7 days' AND start_time <= now();
Tips
- Always back up before altering constraints or bulk deletions.
- For very large deletes, do it in batches to avoid long locks and huge transactions:
-- Repeat until 0 rows affected
WITH cte AS (
SELECT id FROM flow_runs
WHERE start_time < now() - interval '7 days'
LIMIT 5000
)
DELETE FROM flow_runs fr
USING cte
WHERE fr.id = cte.id;
Marvin
08/25/2025, 9:42 AMVACUUM (VERBOSE, ANALYZE);
-- or consider pg_repack if you need online space reclamation
Caveats
- Changing Prefect’s DB schema (constraint definitions) could affect future upgrades/migrations. If you want to avoid schema changes, you can delete in dependency order (as I shared earlier) without ON DELETE CASCADE.
- Prefect Cloud’s database is managed; use the API-based deletion instead.
If you share your actual table names and existing FK definitions (paste the output of the FK inspection query), I’ll generate exact ALTER TABLE statements for your schema.
Postgres reference: PostgreSQL docs: foreign keys and ON DELETE CASCADE