Hi all -- Hopefully I am posting this in the corre...
# ask-community
t
Hi all -- Hopefully I am posting this in the correct channel. Feel free to slap me down if it is not. I originally posted this message in the public discourse, and it was suggested I try raising a ticket. However, I am having trouble doing that due to my set up. I will try to be brief and clear. I am working on a demonstration of prefect and how it might be used for some of our telescopes. My current toy problems is a long-ish pipeline that leverages: • A postgres running remotely (but nearby) in a singularity container • A SLURM cluster (
dask_jobqeue.SLURMCluster
) with a
DaskTaskExecutor
being used, with 10 separate compute nodes being pulled into the distributed dask schedular. • A single large Flow with ~7 tasks, with each task calling a separate python script's main I am just running the postgres database remotely. I have not set an
prefect orion
server running remotely - although I can try this. I found that regular running of the pipeline would often raise a
TimeoutError
in an unpredictable manner. Setting ``PREFECT_ORION_DATABASE_CONNECTION_TIMEOUT=20` eliminated these. Now my problems seem to be
TimeoutErrors
when the flow is closing. I can confirm that all expected data products expected by the pipeline have been created and stored on disk, and all log messages have been issued. The flow is essentially finished executing and wrapping up and then the error is raised. The traceback is too long to post as a comment ( 😢 ) , so it is below.
Copy code
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2128, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
    self._adapt_connection.await_(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
    await adapt_connection._start_transaction()
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
    self._handle_exception(error)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 684, in _handle_exception
    raise error
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 714, in _start_transaction
    await self._transaction.start()
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/transaction.py", line 138, in start
    await self._connection.execute(query)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/connection.py", line 318, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 338, in query
asyncio.exceptions.TimeoutError
And just to reiterate if I was unclear. This error is raised when all work in the function with the
Flow
decorate has been completed successfully. The very last line is a simple log "All done and wrapping up" type message, which is properly captured and reported on the orion UI. It really seems like something in the shutdown and clean up of the flow. I have not been able to disentangle whether the dask schedular /
SLURMCluster
is at play in all of this - but from the slurm logs it looks like it is not.
Copy code
prefect version
Version:             2.6.4
API version:         0.8.2
Python version:      3.9.13
Git commit:          51e92dda
Built:               Thu, Oct 20, 2022 3:11 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          postgresql
Just to follow up -- I set up an instance of the orion server running alongside the postgres database. I think used the
PREFECT_API_URL
variable to point my prefect pipeline to this remote orion server, instead of firing up a local instance for the duration of the pipeline. I am pleased to say that for the first time I got all the way through with no `TimeoutError`s.