William Jamir
03/03/2025, 5:31 PMNicholas Torba
03/03/2025, 5:44 PMNicholas Torba
03/03/2025, 5:45 PMNicholas Torba
03/03/2025, 5:46 PMNicholas Torba
03/03/2025, 5:48 PMfrom django.db import connection as django_connection
from functools import wraps
def close_django_db_connection(task_func):
"""Decorator to ensure the DB connection is closed after task execution"""
@wraps(task_func) # this preserves the original function info
def wrapper(*args, **kwargs):
try:
return task_func(*args, **kwargs) # Run the task
finally:
django_connection.close() # Close the DB connection after execution
return wrapper
## Which you can add to any task like this:
from prefect import task
@task
@close_django_db_connection
def your_task(
x: str,
):
# db connection happens
return x
since you aren't using django, it will be different, but maybe you can do something similar!William Jamir
03/03/2025, 7:32 PMNicholas Torba
03/03/2025, 7:37 PM