ESmith
06/30/2025, 8:14 AMMarvin
06/30/2025, 8:14 AMMarvin
06/30/2025, 8:15 AMpg_trgmpg_trgmPREFECT_API_DATABASE_CONNECTION_URLESmith
06/30/2025, 8:23 AMMarvin
06/30/2025, 8:23 AMMarvin
06/30/2025, 8:24 AMpython
from prefect import get_client
from datetime import datetime, timedelta
async def cleanup_old_runs():
    client = get_client()
    
    # Get flow runs older than X days
    old_runs = await client.read_flow_runs(
        flow_filter={"created_before": datetime.now() - timedelta(days=30)}
    )
    
    # Delete them
    for run in old_runs:
        await client.delete_flow_run(run.id)python
from prefect import flow
from datetime import timedelta
@flow(retries=3)
async def cleanup_flow():
    client = get_client()
    
    # Find and delete runs older than 30 days that are in a terminal state
    old_runs = await client.read_flow_runs(
        flow_filter={
            "created_before": datetime.now() - timedelta(days=30),
            "state": {"type": {"any_": ["COMPLETED", "FAILED", "CANCELLED"]}}
        }
    )
    
    for run in old_runs:
        await client.delete_flow_run(run.id)
# Deploy this flow to run dailyESmith
06/30/2025, 8:43 AMMarvin
06/30/2025, 8:44 AMMarvin
06/30/2025, 8:45 AMget_run_logger()python
from prefect import flow, task
from prefect.logging import get_run_logger
@task
def my_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("This goes to stdout by default")  # Not stored in DB
@flow
def my_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting flow")  # Not stored in DB
    my_task()ESmith
06/30/2025, 8:59 AMMarvin
06/30/2025, 9:00 AMMarvin
06/30/2025, 9:01 AMpublic.logsql
DELETE FROM public.log 
WHERE timestamp < NOW() - INTERVAL '30 days';PREFECT_LOGGING_DB_RETENTION_DAYSbash
export PREFECT_LOGGING_DB_RETENTION_DAYS=30ESmith
06/30/2025, 9:05 AMMarvin
06/30/2025, 9:05 AMMarvin
06/30/2025, 9:05 AMPREFECT_LOGGING_DB_RETENTION_DAYSMattia Sappa
06/30/2025, 9:36 AMESmith
06/30/2025, 10:33 AM