Andrew W
07/21/2023, 8:27 PMOperationalError: FATAL: remaining connection slots are reserved for non-replication superuser and rds_superuser connections
I’ve tried using ConcurrentTaskRunners
and DaskTaskRunners
to solve this issue, but to no avail. When I look at the pg_stat_activity table, I notice that the connections from succeeded tasks are lingering and remain idle, which quickly fills up the number of available connections. I want to note that this behavior didn’t occur in Prefect 1. The only major changes we’ve made to the code are what’s necessary for the update, so we’re currently at a loss as to what’s going on or how to move forward without outright increasing the number of max_connections.
Any help would be greatly appreciated.Nate
07/21/2023, 9:12 PMAndrew W
07/21/2023, 9:42 PM@task()
def transform_record(id):
record = DjangoModel.objects.get(id)
# run transformations
record.save()
How the flow looks like:
@flow()
def pipeline():
list_of_ids = [...]
transform_record.map(list_of_ids)
Nate
07/24/2023, 6:49 PMAndrew W
07/24/2023, 6:52 PMTraceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/django/db/backends/base/base.py", line 289, in ensure_connection
self.connect()
File "/usr/local/lib/python3.9/site-packages/django/utils/asyncio.py", line 26, in inner
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/django/db/backends/base/base.py", line 270, in connect
self.connection = self.get_new_connection(conn_params)
File "/usr/local/lib/python3.9/site-packages/django/utils/asyncio.py", line 26, in inner
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/django/db/backends/postgresql/base.py", line 275, in get_new_connection
connection = self.Database.connect(**conn_params)
File "/usr/local/lib/python3.9/site-packages/psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL: remaining connection slots are reserved for non-replication superuser and rds_superuser connections
Nate
07/24/2023, 6:53 PMAndrew W
07/24/2023, 6:55 PMasyncio.py
when we’ve always been running synchronously and suspect that might be why. Nothing in the code suggests running asynchronously.Nate
07/24/2023, 6:56 PMAndrew W
07/24/2023, 6:59 PMNate
07/24/2023, 7:03 PMFile "/usr/local/lib/python3.9/site-packages/django/db/backends/base/base.py", line 270, in connect
self.connection = self.get_new_connection(conn_params)
as an experiment, what happens if you change the task_runner to be SequentialTaskRunner
? by default, its concurrent, so you'd have
from prefect.task_runners import SequentialTaskRunner
@flow(task_runner=SequentialTaskRunner())
def pipeline():
Andrew W
07/24/2023, 7:20 PMNate
07/24/2023, 7:21 PMAndrew W
07/24/2023, 7:23 PMNate
07/24/2023, 7:25 PMAndrew W
07/24/2023, 7:33 PMNate
07/24/2023, 7:40 PMAndrew W
07/24/2023, 7:41 PMNate
07/24/2023, 7:43 PMAdam Azzam
07/24/2023, 8:04 PMJake Kaplan
07/24/2023, 8:16 PMfrom django.db import close_old_connections
in conjunction with the above settings and a task concurrency limit to stay < max_connections
Andrew W
07/24/2023, 8:16 PMJake Kaplan
07/24/2023, 8:17 PMAndrew W
07/24/2023, 8:18 PMJake Kaplan
07/24/2023, 8:19 PMAndrew W
07/24/2023, 8:39 PMclose_old_connections
to all our tasks that interface with the database. We’re just confused why it worked before because from what I understand, tasks have always run in their own threads.close_old_connections
which I thought would do the trick, but it didn’t. Not sure what else I should be looking at now.
As far as I know mapped tasks have always run in their own threads so I don’t understand why this wasn’t an issue in Prefect 1.Nate
07/25/2023, 4:31 PMmap
will submit
task runs to the ConcurrentTaskRunner
by default. this can be used with sync or async tasks (depending on how you defined the task)Andrew W
07/25/2023, 4:34 PMNate
07/25/2023, 5:33 PMAndrew W
07/26/2023, 8:43 PM