Hey, I'm trying to set up a schedule but it keeps ...
# ask-community
a
Hey, I'm trying to set up a schedule but it keeps failing with the error
Copy code
10:22:55.767 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 19.886566 seconds to run, which is longer than its loop interval of 5 seconds.
I cant find a way of increasing the loop time. I'm running on kubernetes and the server pod is under no cpu/mem pressure. I'm still on 2.x Can anyone help?
If anyone ever has this issue again: this was happening to me because my postgresql db had grown to like 200GB - It seems to keep all state information of every run, for ever. I wrote this script to clear the db which has solved this (and other) issues
Copy code
"""Flow to delete old entries from the database."""

import os

from prefect import flow, task, get_run_logger
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta


@task
def dbconnect():
    """Connect to the database."""
    log = get_run_logger()
    <http://log.info|log.info>("Connecting to database")
    connection_url = os.getenv("PREFECT_API_DATABASE_CONNECTION_URL")
    if not connection_url:
        raise ValueError(
            "Variable PREFECT_API_DATABASE_CONNECTION_URL is not set."
        )
    engine = create_engine(connection_url)
    return engine


def delete_old_entries(engine, table_name, date_column):
    """Delete old entries from the database."""
    log = get_run_logger()
    <http://log.info|log.info>(f"Deleting entries older than 7 days from {table_name}")
    seven_days_ago = datetime.now() - timedelta(days=7)
    query = text(
        f"""
        DELETE FROM {table_name}
        WHERE {date_column} < :seven_days_ago
        """
    )
    with engine.connect() as connection:
        with connection.begin():  # Begin a transaction
            result = connection.execute(
                query, {"seven_days_ago": seven_days_ago.isoformat()}
            )
            deleted_count = result.rowcount
            <http://log.info|log.info>(f"Deleted {deleted_count} entries from {table_name}")


@task
def delete_old_flow_entries(engine):
    """Delete old flow entries from the database."""
    delete_old_entries(engine, "flow_run_state", "timestamp")
    delete_old_entries(engine, "flow_run", "created")


@task
def delete_old_task_entries(engine):
    """Delete old task entries from the database."""
    delete_old_entries(engine, "task_run_state", "timestamp")
    delete_old_entries(engine, "task_run", "created")


@flow(name="DBFlow")
def run():
    """Run the flow to delete old entries from the database."""
    log = get_run_logger()

    engine = dbconnect()  # Retrieve the engine from dbconnect task
    <http://log.info|log.info>("Deleting old task entries")
    delete_old_task_entries(engine)  # Pass the engine to q1 task
    <http://log.info|log.info>("Deleting old flow entries")
    delete_old_flow_entries(engine)  # Pass the engine to q1 task


if __name__ == "__main__":
    run()