Hi everybody, im having an issue with Ray. Its cr...
# ask-community
t
Hi everybody, im having an issue with Ray. Its creating multiple db connections (server db) but its not closing them. I have 100 connections that stay idle. How can i clean these connections from prefect server. One solution is to have a job that cleans these connections every N minutes but im wondering if there is an existing setting that takes care of this. Thank you
@Nate any recommendation?
b
Hey Tedi! Please refrain from tagging folks directly 🙇 . Are you explicitly closing the connections at any point during the task or flow? Something like this?
Copy code
import psycopg2  # Or the library you're using to connect to the database

@task
def query_database():
    conn = psycopg2.connect("your_connection_string")
    try:
        # Perform database operations
        with conn.cursor() as cur:
            cur.execute("SELECT * FROM your_table")
            results = cur.fetchall()
    finally:
        # Ensure the connection is always closed
        conn.close()
t
@Bianca Hoch My apologies. Im not creating the connection. Prefect and Ray are creating it.
n
can you explain what database connections you’re talking about and how you’re observing them?
t
I have a flow that runs a function in parallel. My team is using Ray. Im decorating my function like this:
Copy code
@flow(
    name="some_name",
    task_runner=RayTaskRunner(init_kwargs={"num_cpus": NUM_TRANSFORM_CPUS}),
)
def main(batches, data_dir,save_dir, demo_lavel,config):
    futures =t_signal.temp_transform_batches.map(
        question_configs=batches,
        data_dir=data_dir,
        save_dir=save_dir,
        level=demo_lavel,
        config_obj=unmapped(config.CONFIG),  # type: ignore
    )
    for future in futures:
        future.wait()
where
temp_transform_batches
is decorated with:
Copy code
@task(name="TempStep2-transfrom-list", retries=10, retry_delay_seconds=6)
def temp_transform_batches( ...
@Nate the database connection that im taking about is when Prefect creates the tasks.
I believe im overloading my db with this as it creates 130 connections
The error that i get is this:
Copy code
Crash detected! Execution was interrupted by an unexpected exception: PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://<my_url>:4200/api/task_runs/e93dc7a6-19f2-46a6-89ba-b79ea289841a/set_state'
Response: {'exception_message': 'Internal Server Error'}
im running prefect server from a systemd service like this:
Copy code
// other configs here
[Service]
Type=simple
User=ubuntu
Restart=always
Environment="PATH=/opt/prefect-server/prefect-server/bin/"
ExecStart=sudo PREFECT_SQLALCHEMY_POOL_SIZE=200  PREFECT_SQLALCHEMY_POOL_RECYCLE=1800 PREFECT_SQLALCHEMY_POOL_PRE_PING=True PREFECT_SQLALCHEMY_MAX_OVERFLOW=50 /opt/prefect-server/prefect-server/bin/prefect server start --host 0.0.0.0
n
hmm while trying to minimally reproduce your error I ran into what I believe is a bug in either pydantic or prefect
Copy code
FAILED tests/test_task_runners.py::TestRayTaskRunner::test_can_run_many_tasks_without_crashing[default_ray_task_runner] - ray.exceptions.RaySystemError: System error: Failed to unpickle serialized exception
traceback: Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.9.20/x64/lib/python3.9/site-packages/ray/exceptions.py", line 51, in from_ray_exception
    return pickle.loads(ray_exception.serialized_exception)
TypeError: __init__() missing 1 required keyword-only argument: 'code'
have you seen this at all client side?
t
i believe i have seen this issue with joblib when i use
loky
as backend
👍 1
but i might be wrong
i believe my issue is when i run a job in parallel and each process creates a task
n
if I'm understanding, in general with the ray and dask task runners, that is the expected behavior
t
I see. My team will switch to joblib with
multiprocessing
backend, and will stop creating a task for each parallel process
👍 1
but joblib if i pass a class objc sometime raises the same issue that you faced
basically a sterilization issue
n
if you don't want to use the task runner paradigm, you're free to call arbitrary python (which may call ray) inside of tasks (at that point tasks are just tools for encapsulating work / adding retries / caching etc)
t
Im surprised that this declaration: @task(name="foo bar", retries=10, retry_delay_seconds=6) doesnt capture the Timeout error nor does it retry
Im still confused why doesnt prefect/ray creates so many connection to the db? Each task its a new connection?
n
again, can you clarify what connections you're talking about and how you're counting these?
as far as prefect goes: the task engine will send updates to the API, which writes state changes to the db - which yes will happen for each task
t
Prefect connects to a database. It stores all the task runs/logs/flows etc? I can see in AWS RDS metrics that DatabaseConnections skyrockets when this flows runs
n
It stores all the task runs/logs/flows etc?
yes!
t
if the job crashes, the flow doesnt properly close the connection and they stay idle
n
if the job crashes, the flow doesnt properly close the connection and they stay idle
that sounds like a bug, I'd guess likely related to using the ray task runner
t
postgres can handle the
idle in transaction
connection but not
idle
i have to manually kill these connections
n
are you willing to open an issue with a minimal reproduction of this?
t
okay
🙏 1