David Michael Gang
10/13/2024, 7:49 AM@db_injector
async def bulk_increment_active_slots(
db: PrefectDBInterface,
session: AsyncSession,
concurrency_limit_ids: List[UUID],
slots: int,
) -> bool:
active_slots = active_slots_after_decay(db)
denied_slots = denied_slots_after_decay(db)
query = (
sa.update(orm_models.ConcurrencyLimitV2)
.where(
sa.and_(
orm_models.ConcurrencyLimitV2.id.in_(concurrency_limit_ids),
orm_models.ConcurrencyLimitV2.active == True, # noqa
active_slots + slots <= orm_models.ConcurrencyLimitV2.limit,
)
)
.values(
active_slots=active_slots + slots,
denied_slots=denied_slots,
)
).execution_options(synchronize_session=False)
result = await session.execute(query)
return result.rowcount == len(concurrency_limit_ids)
It looks like the code has a race limit. We could have two transactions coming in parallel from different machines at the same time and as the default transaction level in postgresql is READ COMITTED (https://www.postgresql.org/docs/current/transaction-iso.html) there is no guarantee for serialization. You make the serialization on the client level in a queue but the rate limit can be called globally from multiple levels. Maybe I miss something or maybe this rate limit should work well for 99% of all places, but i would be interested to hear what your motivatins were.