Hi All, Has anyone encounter duplicate key error o...
# ask-community
r
Hi All, Has anyone encounter duplicate key error on postgresql when running multiple flows concurrently?
Copy code
duplicate key value violates unique constraint "uq_configuration__key"
DETAIL:  Key (key)=(ENCRYPTION_KEY) already exists.
Full stack trace in reply.
Copy code
15:10:37.277 | ERROR   | prefect.server - Encountered exception in request:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 442, in _prepare_and_execute
    self._rows = await prepared_stmt.fetch(*parameters)
  File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
    data = await self.__bind_execute(args, 0, timeout)
  File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
    data, status, _ = await self.__do_execute(
  File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
    return await executor(protocol)
  File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
asyncpg.exceptions.UniqueViolationError: duplicate key value violates unique constraint "uq_configuration__key"
DETAIL:  Key (key)=(ENCRYPTION_KEY) already exists.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1905, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
    self._adapt_connection.await_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 454, in _prepare_and_execute
    self._handle_exception(error)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 389, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
    raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: duplicate key value violates unique constraint "uq_configuration__key"
DETAIL:  Key (key)=(ENCRYPTION_KEY) already exists.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await <http://self.app|self.app>(scope, receive, sender)
  File "/usr/local/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
    raise e
  File "/usr/local/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
    await <http://self.app|self.app>(scope, receive, send)
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 276, in handle
    await <http://self.app|self.app>(scope, receive, send)
  File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File "/usr/local/lib/python3.9/site-packages/prefect/server/utilities/server.py", line 103, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/usr/local/lib/python3.9/site-packages/fastapi/routing.py", line 241, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.9/site-packages/fastapi/routing.py", line 167, in run_endpoint_function
    return await dependant.call(**values)
  File "/usr/local/lib/python3.9/site-packages/prefect/server/api/block_documents.py", line 41, in create_block_document
    return await models.block_documents.create_block_document(
  File "/usr/local/lib/python3.9/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/server/models/block_documents.py", line 51, in create_block_document
    await orm_block.encrypt_data(session=session, data=block_document_data_without_refs)
  File "/usr/local/lib/python3.9/site-packages/prefect/server/database/orm_models.py", line 1093, in encrypt_data
    self.data = await encrypt_fernet(session, data)
  File "/usr/local/lib/python3.9/site-packages/prefect/server/utilities/encryption.py", line 37, in encrypt_fernet
    fernet = await get_fernet_encryption(session)
  File "/usr/local/lib/python3.9/site-packages/prefect/server/utilities/encryption.py", line 30, in get_fernet_encryption
    await configuration.write_configuration(session, configured_key)
  File "/usr/local/lib/python3.9/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/server/models/configuration.py", line 29, in write_configuration
    await session.flush()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/session.py", line 408, in flush
    await greenlet_spawn(self.sync_session.flush, objects=objects)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
    result = context.switch(value)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3589, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    _emit_insert_statements(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
    result = connection._execute_20(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1948, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2129, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1905, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
    self._adapt_connection.await_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 454, in _prepare_and_execute
    self._handle_exception(error)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 389, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
    raise translated_error from error
sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>: duplicate key value violates unique constraint "uq_configuration__key"
DETAIL:  Key (key)=(ENCRYPTION_KEY) already exists.
[SQL: INSERT INTO configuration (id, created, updated, key, value) VALUES (%s, %s, %s, %s, %s)]
[parameters: (UUID('17e7717a-cc2f-4823-8114-76e7ab694bc2'), DateTime(2023, 8, 21, 15, 10, 37, 223748, tzinfo=Timezone('UTC')), DateTime(2023, 8, 21, 15, 10, 37, 223771, tzinfo=Timezone('UTC')), 'ENCRYPTION_KEY', '{"fernet_key": "hUPUFEpihk8sy5X9E5W5qu5ktCSB9qK49Bg-zRDtKew="}')]
(Background on this error at: <https://sqlalche.me/e/14/gkpj>)
15:10:42.077 | ERROR   | prefect.server.services.telemetry - Failed to send telemetry:
Shutting down telemetry service...
k
Hi @Renee Wu. I am seeing the same issue when running concurrent flows. Were you able to resolve it?
m
Facing same problem. Any tip here?
k
Hi @Mana. In my case, this would happen only when it was a freshly installed prefect instance and while attempting to run multiple flows concurrently. If you look in the get_fernet_encryption function in https://github.com/PrefectHQ/prefect/blob/c0706c614f6885630c818b16fd1c3c03b05c9955/src/prefect/server/utilities/encryption.py#L23, it appears to look for an
ENCRYPTION_KEY
record in the db. If it doesn’t find it, it will create the
ENCRYPTION_KEY
record in the database. This works fine if the first flow run after initial prefect deploy is a singular flow. However, if the first flows run after the initial prefect deployment are run concurrently, sometimes both jobs will see that the
ENCRYPTION_KEY
record isn’t there and then both will attempt to create the
ENCRYPTION_KEY
record. The second will fail with the
uq_configuration__key
unique key violation. For me, I resolved this by making sure that I ran a singular flow to completion before running any concurrent flow jobs after my initial prefect installation. Hope this helps.
m
@Kelby thanks for sharing your experience. I wiped out the database and created it again, and it took care of it. I didn't know that we could reset database at the time. I think that's better way to cleanup the database.