https://prefect.io logo
a

Andrew W

07/21/2023, 8:27 PM
Hi, I’m currently facing an issue with Prefect 2 that I wasn’t facing in Prefect 1. On our mapped task that interfaces with a postgres database, I’m getting this error:
Copy code
OperationalError: FATAL:  remaining connection slots are reserved for non-replication superuser and rds_superuser connections
I’ve tried using
ConcurrentTaskRunners
and
DaskTaskRunners
to solve this issue, but to no avail. When I look at the pg_stat_activity table, I notice that the connections from succeeded tasks are lingering and remain idle, which quickly fills up the number of available connections. I want to note that this behavior didn’t occur in Prefect 1. The only major changes we’ve made to the code are what’s necessary for the update, so we’re currently at a loss as to what’s going on or how to move forward without outright increasing the number of max_connections. Any help would be greatly appreciated.
n

Nate

07/21/2023, 9:12 PM
hi @Andrew W - would you be able to share the relevant code thats interacting with the database?
a

Andrew W

07/21/2023, 9:42 PM
Hi Nate, I’m using django ORM to get a queryset from a model, and running transformations on it before saving it back to the database:
Copy code
@task()
def transform_record(id):
   record = DjangoModel.objects.get(id)
   # run transformations
   record.save()
How the flow looks like:
Copy code
@flow()
def pipeline():
   list_of_ids = [...]
   transform_record.map(list_of_ids)
Hi @Nate any thoughts?
n

Nate

07/24/2023, 6:49 PM
do you have more of the trace?
a

Andrew W

07/24/2023, 6:52 PM
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/django/db/backends/base/base.py", line 289, in ensure_connection
    self.connect()
  File "/usr/local/lib/python3.9/site-packages/django/utils/asyncio.py", line 26, in inner
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/django/db/backends/base/base.py", line 270, in connect
    self.connection = self.get_new_connection(conn_params)
  File "/usr/local/lib/python3.9/site-packages/django/utils/asyncio.py", line 26, in inner
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/django/db/backends/postgresql/base.py", line 275, in get_new_connection
    connection = self.Database.connect(**conn_params)
  File "/usr/local/lib/python3.9/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL:  remaining connection slots are reserved for non-replication superuser and rds_superuser connections
n

Nate

07/24/2023, 6:53 PM
that seems to be a django error no?
do you have any ideas how prefect may be involved?
a

Andrew W

07/24/2023, 6:55 PM
This only started happening after migrating to Prefect 2. When I run the flow on Prefect 1, this error doesn’t occur. When I look at the activity table, the connections don’t stay opened and idle when running in Prefect 1.
I’m not entirely sure why the stack trace includes
asyncio.py
when we’ve always been running synchronously and suspect that might be why. Nothing in the code suggests running asynchronously.
n

Nate

07/24/2023, 6:56 PM
hmm, where is the connection object managed? is it possible that they're not being cleaned up with a context manager or something? prefect 2 uses async under the hood in certain cases
a

Andrew W

07/24/2023, 6:59 PM
So we’ve always used Django ORM to interface with the database, and use Django methods on Model objects to fetch and save records. No direct calls to context managers.
n

Nate

07/24/2023, 7:03 PM
hmm yeah it seems that django is handling that for you here
Copy code
File "/usr/local/lib/python3.9/site-packages/django/db/backends/base/base.py", line 270, in connect
    self.connection = self.get_new_connection(conn_params)
as an experiment, what happens if you change the task_runner to be
SequentialTaskRunner
? by default, its concurrent, so you'd have
Copy code
from prefect.task_runners import SequentialTaskRunner

@flow(task_runner=SequentialTaskRunner())
def pipeline():
a

Andrew W

07/24/2023, 7:20 PM
It seems to be having the same issue. Because it’s running sequentially it’s taking a bit of time, but looking at the activity table, the connections don’t seem to be closing and will eventually error too.
n

Nate

07/24/2023, 7:21 PM
hmm, I can't really think of how the lifetime of the connection object would be related to prefect
a

Andrew W

07/24/2023, 7:23 PM
Yea it just error’d and I’m getting the same message.
I can’t think of any other reason as the logic itself hasn’t really changed, and all the changes we’ve made have been upgrade specific. Looked around and it doesn’t seem to be an issue for others and so we’re just kind of lost at this point.
n

Nate

07/24/2023, 7:25 PM
hey @Jake Kaplan - when you have a min, any chance you have an idea of what's going on here?
afaik, pending this (which may be dead as is), tasks are all still run in worker threads, so I believe each thread would create its own connection object how many tasks do you have?
a

Andrew W

07/24/2023, 7:33 PM
We expect around 300 tasks every run which, if each task holds onto a connection, is significantly over how many max_connections are allowed.
300 tasks are just from the mapped tasks
n

Nate

07/24/2023, 7:40 PM
could you set a concurrency limit here with tags? that way you can enforce that only N tasks / connections are active at a given time
a

Andrew W

07/24/2023, 7:41 PM
We’ve done this but also to no avail. We originally had a concurrency limit of 20 but the connections would still remain opened.
n

Nate

07/24/2023, 7:43 PM
hmm it seems like Django is not cleaning up the connection objects after use then?
im not sure what else would be happening, but could be missing something
a

Adam Azzam

07/24/2023, 8:04 PM
What’s your DB setting config? (Eg conn_max_age and conn_health_checks)
upvote 1
j

Jake Kaplan

07/24/2023, 8:16 PM
I haven't run django much in flows, you may need to close them manually with
from django.db import close_old_connections
in conjunction with the above settings and a task concurrency limit to stay <
max_connections
a

Andrew W

07/24/2023, 8:16 PM
I’m setting CONN_HEALTH_CHECKS to true but we’ve always used the default for both. The reason being it was always expected that django would close the connections after each mapped task finishes.
j

Jake Kaplan

07/24/2023, 8:17 PM
you're using the django orm inside of a flow run straight up right? it's not a web app or wrapped in a django management command or anything?
a

Andrew W

07/24/2023, 8:18 PM
Yes, we’re running the ORM in the tasks of our flow run.
j

Jake Kaplan

07/24/2023, 8:19 PM
like nate said, I believe tasks are run in their own threads. I don't think django will teardown a connection for you when it's created in a manual (from djangos POV) separate thread (and not in like a django api request context)
a

Andrew W

07/24/2023, 8:39 PM
We’re hoping the connection settings changes will help, but don’t really want to have to add the
close_old_connections
to all our tasks that interface with the database. We’re just confused why it worked before because from what I understand, tasks have always run in their own threads.
Hi @Jake Kaplan so we’ve tried the setting changes, but neither have helped with our issue. We followed up with using
close_old_connections
which I thought would do the trick, but it didn’t. Not sure what else I should be looking at now. As far as I know mapped tasks have always run in their own threads so I don’t understand why this wasn’t an issue in Prefect 1.
Do you happen to know whether mapped tasks are run asynchronously by default? What if I had a downstream task that waited on the mapped tasks’ outputs?
n

Nate

07/25/2023, 4:31 PM
mapped tasks are run concurrently by default, as the
map
will
submit
task runs to the
ConcurrentTaskRunner
by default. this can be used with sync or async tasks (depending on how you defined the task)
a

Andrew W

07/25/2023, 4:34 PM
So I’m guessing it’s safe to assume it isn’t an async problem?
n

Nate

07/25/2023, 5:33 PM
I wouldn't think its an async problem - could be wrong
a

Andrew W

07/26/2023, 8:43 PM
@Nate do you know if SequentialTaskRunners also submit tasks to a thread pool? I’m currently still debugging with SequentialTaskRunners and it seems like my django requests are being sent from different threads which is why these connections aren’t being reused like they were in Prefect 1