<@ULVA73B9P> give me script to cleanuo the prefect...
# ask-marvin
a
@Marvin give me script to cleanuo the prefect data ofpaast weeek
m
thought for 219.4 seconds
Got it — do you want to delete data that’s older than 7 days, or delete data from the last 7 days? And are you on Prefect Cloud or self-hosted Prefect Server? Also, should we remove entire flow runs (which will remove related task runs, states, and logs), or just logs/artifacts? In the meantime, here’s a ready-to-run cleanup script for Prefect 3.x that deletes flow runs older than 7 days. It uses the Prefect API via the async
PrefectClient
. Set
DELETE_MODE
to
"older"
(default) or
"within"
if you want just the last week. Run it where your Prefect environment variables are set (e.g.,
PREFECT_API_URL
and
PREFECT_API_KEY
for Cloud).
Copy code
import asyncio
from datetime import datetime, timedelta, timezone

from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FilterSet,
    FlowRunFilter,
    FlowRunFilterStartTime,
)

# ---- Configuration ----
DAYS = 7
DELETE_MODE = "older"   # "older" -> delete runs older than N days
                        # "within" -> delete runs that occurred in the last N days
DRY_RUN = False         # True => just print what would be deleted
PAGE_SIZE = 200         # tune for your workspace size
# -----------------------


def _time_window(days: int, mode: str):
    now = datetime.now(timezone.utc)
    start = now - timedelta(days=days)
    if mode == "older":
        # Everything strictly before 'start'
        return None, start
    elif mode == "within":
        # Anything between 'start' and 'now'
        return start, now
    else:
        raise ValueError("DELETE_MODE must be 'older' or 'within'")

async def main():
    after, before = _time_window(DAYS, DELETE_MODE)

    # Build time filter
    time_filter = FlowRunFilterStartTime(
        after=after if after else None,
        before=before if before else None,
    )
    filters = FilterSet(flow_runs=FlowRunFilter(start_time=time_filter))

    async with get_client() as client:
        # Collect IDs first (avoid paging issues when deleting)
        flow_run_ids = []
        offset = 0

        while True:
            runs = await client.read_flow_runs(
                limit=PAGE_SIZE,
                offset=offset,
                filters=filters,
            )
            if not runs:
                break
            flow_run_ids.extend([r.id for r in runs])
            offset += PAGE_SIZE

        print(f"Found {len(flow_run_ids)} flow runs to delete (mode={DELETE_MODE}, days={DAYS}).")

        if DRY_RUN or not flow_run_ids:
            print("DRY_RUN is True or no runs found; exiting without deletion.")
            return

        # Delete runs (deleting a flow run removes its related task runs/states/logs)
        deleted = 0
        for fr_id in flow_run_ids:
            try:
                await client.delete_flow_run(fr_id)
                deleted += 1
            except Exception as exc:
                print(f"Failed to delete flow run {fr_id}: {exc}")

        print(f"Deleted {deleted} flow runs.")

if __name__ == "__main__":
    asyncio.run(main())
Notes: - For Prefect Cloud, ensure
PREFECT_API_URL
and
PREFECT_API_KEY
are set in your environment. - Deleting a flow run will remove related task runs/states/logs. If you prefer to only purge logs while retaining runs, say the word and I’ll adjust the script. References: - Prefect 3.x Python client API - Flow run API reference If you confirm your exact target (older vs within last 7 days; Cloud vs Server; delete runs vs just logs/artifacts), I can tailor this precisely for you.
a
the data in the db
@Marvin dekeet tye data in the db
m
thought for 152.9 seconds
Got it — you want to delete the data directly from the Prefect database. Quick clarifications so I can tailor this precisely: - Is this Prefect Cloud or self-hosted Prefect Server? (DB access is only possible for self-hosted.) - Do you want to delete data older than 7 days, or data from the last 7 days? - Which Postgres table names do you see? Prefect versions may use singular or plural (e.g., flow_runs vs flow_run, log vs logs). If you’re on Prefect Server (Postgres), here’s a safe, transactional Postgres script you can run as a starting point. It deletes flow runs by time window and cleans up related task runs, states, logs, and artifacts. You’ll need to adjust table names to match your schema (see notes in the script). Important: - Make a backup first. - Run in a transaction, preview counts, then uncomment deletes. - Adjust table names (singular/plural) to match your DB: use \dt or the discovery query below.
Copy code
-- ============
-- Prefect DB cleanup (Postgres)
-- Deletes flow runs and related data based on a time window
-- IMPORTANT: Adjust table names if your schema differs (see comments)
-- ============
-- Common names by Prefect version:
--   flow_runs or flow_run
--   task_runs or task_run
--   flow_run_states or flow_run_state
--   task_run_states or task_run_state
--   log or logs
--   artifacts or artifact
--   events or event

BEGIN;

-- Configure your window here:
-- 1) Delete runs OLDER than 7 days:
WITH target_flow_runs AS (
  SELECT id
  FROM flow_runs         -- adjust if your table is named flow_run
  WHERE start_time < now() - interval '7 days'
)

-- 2) Or, delete runs WITHIN the last 7 days (use this instead of the CTE above):
-- WITH target_flow_runs AS (
--   SELECT id
--   FROM flow_runs
--   WHERE start_time >= now() - interval '7 days' AND start_time <= now()
-- )

-- Preview counts before deleting
SELECT count(*) AS flow_runs_to_delete FROM target_flow_runs;

SELECT
  (SELECT count(*) FROM task_runs tr WHERE tr.flow_run_id IN (SELECT id FROM target_flow_runs)) AS task_runs,
  (SELECT count(*) FROM flow_run_states frs WHERE frs.flow_run_id IN (SELECT id FROM target_flow_runs)) AS flow_run_states,
  (SELECT count(*) FROM task_run_states trs WHERE trs.task_run_id IN (
      SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs)
  )) AS task_run_states,
  (SELECT count(*) FROM log l WHERE l.flow_run_id IN (SELECT id FROM target_flow_runs)
     OR l.task_run_id IN (SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs))) AS logs,
  (SELECT count(*) FROM artifacts a WHERE a.flow_run_id IN (SELECT id FROM target_flow_runs)
     OR a.task_run_id IN (SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs))) AS artifacts,
  (SELECT count(*) FROM events e WHERE e.flow_run_id IN (SELECT id FROM target_flow_runs)) AS events;

-- If the counts look correct, uncomment deletes IN THIS ORDER.
-- NOTE: If your tables are singular/plural, update names accordingly.

-- DELETE FROM task_run_states
-- WHERE task_run_id IN (
--   SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs)
-- );

-- DELETE FROM flow_run_states
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs);

-- DELETE FROM log
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs)
--    OR task_run_id IN (SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs));

-- DELETE FROM artifacts
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs)
--    OR task_run_id IN (SELECT id FROM task_runs WHERE flow_run_id IN (SELECT id FROM target_flow_runs));

-- Optional: if you store events tied to flow runs
-- DELETE FROM events
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs);

-- DELETE FROM task_runs
-- WHERE flow_run_id IN (SELECT id FROM target_flow_runs);

-- DELETE FROM flow_runs
-- WHERE id IN (SELECT id FROM target_flow_runs);

-- When satisfied, COMMIT; otherwise ROLLBACK;
-- COMMIT;
-- ROLLBACK;
Helpful discovery query to list relevant tables in your DB (paste results here and I’ll tailor the script exactly):
Copy code
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
  AND tablename ILIKE ANY (ARRAY['%flow_run%','%task_run%','%state%','%log%','%artifact%','%event%'])
ORDER BY tablename;
Notes: - If your DB has foreign keys with ON DELETE CASCADE configured, you could delete from flow_runs and let Postgres cascade the rest. However, not all installations have cascade set for all relations, so the explicit-order deletes above are safer. - After large deletes, consider running VACUUM (or pg_repack) during a maintenance window to reclaim space. If you can confirm: - Cloud vs Server - “older than 7 days” vs “within last 7 days” - The exact table names from your DB …I’ll send a pinpointed script for your environment. References: - Prefect 3.x Python client API (if you prefer API-based cleanup instead of direct SQL)
a
@Marvin how can we delete rows and cascade
m
thought for 48.2 seconds
If you’re deleting directly in Postgres, cascades are controlled by foreign keys with ON DELETE CASCADE. Postgres does not support “DELETE … CASCADE” as a statement option; you must define FKs to cascade from the parent table(s). Then a single DELETE on the parent will remove dependent rows automatically. Here’s a practical, safe approach for Prefect Server (self‑hosted Postgres): 1) Inspect current foreign keys to see whether they already cascade
Copy code
-- Lists FKs and their actions for the public schema
SELECT
  conrelid::regclass AS child_table,
  conname            AS constraint_name,
  pg_get_constraintdef(c.oid) AS definition
FROM pg_constraint c
JOIN pg_namespace n ON n.oid = c.connamespace
WHERE c.contype = 'f'
  AND n.nspname = 'public'
ORDER BY child_table, constraint_name;
2) Add or change FKs to ON DELETE CASCADE Run inside a maintenance window; this will lock the child tables while altering constraints. Adjust table/column names to match your schema (singular vs plural). Common relationships in Prefect: - flow_runs -> task_runs - flow_runs -> flow_run_states - task_runs -> task_run_states - flow_runs/task_runs -> log - flow_runs/task_runs -> artifacts - flow_runs -> events (if present) Example DDL (edit names to match your DB):
Copy code
BEGIN;

-- task_runs -> flow_runs
ALTER TABLE task_runs DROP CONSTRAINT IF EXISTS task_runs_flow_run_id_fkey;
ALTER TABLE task_runs
  ADD CONSTRAINT task_runs_flow_run_id_fkey
  FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;

-- flow_run_states -> flow_runs
ALTER TABLE flow_run_states DROP CONSTRAINT IF EXISTS flow_run_states_flow_run_id_fkey;
ALTER TABLE flow_run_states
  ADD CONSTRAINT flow_run_states_flow_run_id_fkey
  FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;

-- task_run_states -> task_runs
ALTER TABLE task_run_states DROP CONSTRAINT IF EXISTS task_run_states_task_run_id_fkey;
ALTER TABLE task_run_states
  ADD CONSTRAINT task_run_states_task_run_id_fkey
  FOREIGN KEY (task_run_id) REFERENCES task_runs(id) ON DELETE CASCADE;

-- log -> flow_runs (if column exists and you want logs removed with flow runs)
ALTER TABLE log DROP CONSTRAINT IF EXISTS log_flow_run_id_fkey;
ALTER TABLE log
  ADD CONSTRAINT log_flow_run_id_fkey
  FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;

-- log -> task_runs (if column exists)
ALTER TABLE log DROP CONSTRAINT IF EXISTS log_task_run_id_fkey;
ALTER TABLE log
  ADD CONSTRAINT log_task_run_id_fkey
  FOREIGN KEY (task_run_id) REFERENCES task_runs(id) ON DELETE CASCADE;

-- artifacts -> flow_runs
ALTER TABLE artifacts DROP CONSTRAINT IF EXISTS artifacts_flow_run_id_fkey;
ALTER TABLE artifacts
  ADD CONSTRAINT artifacts_flow_run_id_fkey
  FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;

-- artifacts -> task_runs
ALTER TABLE artifacts DROP CONSTRAINT IF EXISTS artifacts_task_run_id_fkey;
ALTER TABLE artifacts
  ADD CONSTRAINT artifacts_task_run_id_fkey
  FOREIGN KEY (task_run_id) REFERENCES task_runs(id) ON DELETE CASCADE;

-- events -> flow_runs (if present)
ALTER TABLE events DROP CONSTRAINT IF EXISTS events_flow_run_id_fkey;
ALTER TABLE events
  ADD CONSTRAINT events_flow_run_id_fkey
  FOREIGN KEY (flow_run_id) REFERENCES flow_runs(id) ON DELETE CASCADE;

COMMIT;
3) Delete from the parent; children will cascade automatically - Older than 7 days:
Copy code
DELETE FROM flow_runs
WHERE start_time < now() - interval '7 days';
- Within the last 7 days:
Copy code
DELETE FROM flow_runs
WHERE start_time >= now() - interval '7 days' AND start_time <= now();
Tips - Always back up before altering constraints or bulk deletions. - For very large deletes, do it in batches to avoid long locks and huge transactions:
Copy code
-- Repeat until 0 rows affected
WITH cte AS (
  SELECT id FROM flow_runs
  WHERE start_time < now() - interval '7 days'
  LIMIT 5000
)
DELETE FROM flow_runs fr
USING cte
WHERE fr.id = cte.id;
- After big deletes, reclaim space during a maintenance window:
Copy code
VACUUM (VERBOSE, ANALYZE);
-- or consider pg_repack if you need online space reclamation
Caveats - Changing Prefect’s DB schema (constraint definitions) could affect future upgrades/migrations. If you want to avoid schema changes, you can delete in dependency order (as I shared earlier) without ON DELETE CASCADE. - Prefect Cloud’s database is managed; use the API-based deletion instead. If you share your actual table names and existing FK definitions (paste the output of the FK inspection query), I’ll generate exact ALTER TABLE statements for your schema. Postgres reference: PostgreSQL docs: foreign keys and ON DELETE CASCADE