Tedi Gjoni
01/06/2025, 6:53 PMTedi Gjoni
01/06/2025, 6:53 PMBianca Hoch
01/06/2025, 7:44 PMimport psycopg2 # Or the library you're using to connect to the database
@task
def query_database():
conn = psycopg2.connect("your_connection_string")
try:
# Perform database operations
with conn.cursor() as cur:
cur.execute("SELECT * FROM your_table")
results = cur.fetchall()
finally:
# Ensure the connection is always closed
conn.close()
Tedi Gjoni
01/06/2025, 7:50 PMNate
01/06/2025, 7:51 PMTedi Gjoni
01/06/2025, 7:54 PM@flow(
name="some_name",
task_runner=RayTaskRunner(init_kwargs={"num_cpus": NUM_TRANSFORM_CPUS}),
)
def main(batches, data_dir,save_dir, demo_lavel,config):
futures =t_signal.temp_transform_batches.map(
question_configs=batches,
data_dir=data_dir,
save_dir=save_dir,
level=demo_lavel,
config_obj=unmapped(config.CONFIG), # type: ignore
)
for future in futures:
future.wait()
Tedi Gjoni
01/06/2025, 7:55 PMtemp_transform_batches
is decorated with:
@task(name="TempStep2-transfrom-list", retries=10, retry_delay_seconds=6)
def temp_transform_batches( ...
Tedi Gjoni
01/06/2025, 7:56 PMTedi Gjoni
01/06/2025, 7:56 PMTedi Gjoni
01/06/2025, 7:57 PMCrash detected! Execution was interrupted by an unexpected exception: PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://<my_url>:4200/api/task_runs/e93dc7a6-19f2-46a6-89ba-b79ea289841a/set_state'
Response: {'exception_message': 'Internal Server Error'}
Tedi Gjoni
01/06/2025, 7:59 PM// other configs here
[Service]
Type=simple
User=ubuntu
Restart=always
Environment="PATH=/opt/prefect-server/prefect-server/bin/"
ExecStart=sudo PREFECT_SQLALCHEMY_POOL_SIZE=200 PREFECT_SQLALCHEMY_POOL_RECYCLE=1800 PREFECT_SQLALCHEMY_POOL_PRE_PING=True PREFECT_SQLALCHEMY_MAX_OVERFLOW=50 /opt/prefect-server/prefect-server/bin/prefect server start --host 0.0.0.0
Nate
01/06/2025, 10:22 PMFAILED tests/test_task_runners.py::TestRayTaskRunner::test_can_run_many_tasks_without_crashing[default_ray_task_runner] - ray.exceptions.RaySystemError: System error: Failed to unpickle serialized exception
traceback: Traceback (most recent call last):
File "/opt/hostedtoolcache/Python/3.9.20/x64/lib/python3.9/site-packages/ray/exceptions.py", line 51, in from_ray_exception
return pickle.loads(ray_exception.serialized_exception)
TypeError: __init__() missing 1 required keyword-only argument: 'code'
have you seen this at all client side?Tedi Gjoni
01/06/2025, 10:24 PMloky
as backendTedi Gjoni
01/06/2025, 10:24 PMTedi Gjoni
01/06/2025, 10:26 PMNate
01/06/2025, 10:27 PMTedi Gjoni
01/06/2025, 10:29 PMmultiprocessing
backend, and will stop creating a task for each parallel processTedi Gjoni
01/06/2025, 10:30 PMTedi Gjoni
01/06/2025, 10:31 PMNate
01/06/2025, 10:31 PMTedi Gjoni
01/06/2025, 10:33 PMTedi Gjoni
01/06/2025, 10:36 PMNate
01/06/2025, 10:36 PMNate
01/06/2025, 10:37 PMTedi Gjoni
01/06/2025, 10:39 PMNate
01/06/2025, 10:39 PMIt stores all the task runs/logs/flows etc?yes!
Tedi Gjoni
01/06/2025, 10:39 PMNate
01/06/2025, 10:40 PMif the job crashes, the flow doesnt properly close the connection and they stay idlethat sounds like a bug, I'd guess likely related to using the ray task runner
Tedi Gjoni
01/06/2025, 10:41 PMidle in transaction
connection but not idle
Tedi Gjoni
01/06/2025, 10:41 PMNate
01/06/2025, 10:41 PMTedi Gjoni
01/06/2025, 10:41 PM