Arthur
11/05/2024, 10:24 AM10:22:55.767 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 19.886566 seconds to run, which is longer than its loop interval of 5 seconds.
I cant find a way of increasing the loop time. I'm running on kubernetes and the server pod is under no cpu/mem pressure. I'm still on 2.x
Can anyone help?Arthur
11/07/2024, 3:58 PM"""Flow to delete old entries from the database."""
import os
from prefect import flow, task, get_run_logger
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
@task
def dbconnect():
"""Connect to the database."""
log = get_run_logger()
<http://log.info|log.info>("Connecting to database")
connection_url = os.getenv("PREFECT_API_DATABASE_CONNECTION_URL")
if not connection_url:
raise ValueError(
"Variable PREFECT_API_DATABASE_CONNECTION_URL is not set."
)
engine = create_engine(connection_url)
return engine
def delete_old_entries(engine, table_name, date_column):
"""Delete old entries from the database."""
log = get_run_logger()
<http://log.info|log.info>(f"Deleting entries older than 7 days from {table_name}")
seven_days_ago = datetime.now() - timedelta(days=7)
query = text(
f"""
DELETE FROM {table_name}
WHERE {date_column} < :seven_days_ago
"""
)
with engine.connect() as connection:
with connection.begin(): # Begin a transaction
result = connection.execute(
query, {"seven_days_ago": seven_days_ago.isoformat()}
)
deleted_count = result.rowcount
<http://log.info|log.info>(f"Deleted {deleted_count} entries from {table_name}")
@task
def delete_old_flow_entries(engine):
"""Delete old flow entries from the database."""
delete_old_entries(engine, "flow_run_state", "timestamp")
delete_old_entries(engine, "flow_run", "created")
@task
def delete_old_task_entries(engine):
"""Delete old task entries from the database."""
delete_old_entries(engine, "task_run_state", "timestamp")
delete_old_entries(engine, "task_run", "created")
@flow(name="DBFlow")
def run():
"""Run the flow to delete old entries from the database."""
log = get_run_logger()
engine = dbconnect() # Retrieve the engine from dbconnect task
<http://log.info|log.info>("Deleting old task entries")
delete_old_task_entries(engine) # Pass the engine to q1 task
<http://log.info|log.info>("Deleting old flow entries")
delete_old_flow_entries(engine) # Pass the engine to q1 task
if __name__ == "__main__":
run()