Race condition in `QuerySet.get_or_create`
Opened this issue · 2 comments
Describe the bug
When asynchronously attempting to get an item that does not exist in the db with get_or_create
twice (or more times) concurrently, on a model with an ormar.UniqueColumns
-constraint, asyncpg.UniqueViolationError
is thrown for the losing attempt at creating it.
To Reproduce
# app.core.db.py
import databases
from app.core.config import settings
database = databases.Database(
f'postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_URL}'
)
Define a model with an unique constrant:
# app.models.my_models.py
import ormar
import sqlalchemy
from app.core.db import database
class Application(ormar.Model):
class Meta(ormar.ModelMeta):
database = database
metadata = sqlalchemy.MetaData()
constraints = [ormar.UniqueColumns('name', 'vendor')] # Ensures unique entries per vendor
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=256, nullable=False)
vendor: str = ormar.String(max_length=256)
Attempt to get_or_create
something that doesnt exists twice or more times concurrently with asyncio tasks (or taskgroups):
import asyncio
import pytest
from app.core.db import database
from app.models.my_models import Application
@pytest.mark.asyncio
async def test_race_condition():
async with database:
tasks = [
asyncio.create_task(Application.objects.get_or_create(name='my_name', vendor='my_vendor')),
asyncio.create_task(Application.objects.get_or_create(name='my_name', vendor='my_vendor'))
]
await asyncio.gather(*tasks)
# async with asyncio.TaskGroup() as tg:
# tg.create_task(Application.objects.get_or_create(name='my_name', vendor='my_vendor'))
# tg.create_task(Application.objects.get_or_create(name='my_name', vendor='my_vendor'))
The above test results in the following error:
ERROR [root] duplicate key value violates unique constraint "uc_applications_name_vendor"
DETAIL: Key (name, vendor)=(my_name, my_vendor) already exists.
Traceback (most recent call last):
File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 1016, in get_or_create
return await self.get(*args, **kwargs), False
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 965, in get
return await self.filter(*args, **kwargs).get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 989, in get
self.check_single_result_rows_count(processed_rows)
File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 243, in check_single_result_rows_count
raise NoMatch()
ormar.exceptions.NoMatch
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "...\tests\test_application_race_condition.py", line 18, in test_race_condition
res = await asyncio.gather(*tasks)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 1019, in get_or_create
return await self.create(**{**kwargs, **_defaults}), True
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 1125, in create
instance = await instance.save()
^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\ormar\models\model.py", line 94, in save
pk = await self.Meta.database.execute(expr)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\databases\core.py", line 163, in execute
return await connection.execute(query, values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\databases\core.py", line 274, in execute
return await self._connection.execute(built_query)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\databases\backends\postgres.py", line 217, in execute
return await self._connection.fetchval(query_str, *args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\asyncpg\connection.py", line 645, in fetchval
data = await self._execute(query, args, 1, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\asyncpg\connection.py", line 1659, in _execute
result, _ = await self.__execute(
^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\asyncpg\connection.py", line 1684, in __execute
return await self._do_execute(
^^^^^^^^^^^^^^^^^^^^^^^
File "...\.venv\win\python311\Lib\site-packages\asyncpg\connection.py", line 1731, in _do_execute
result = await executor(stmt, None)
^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncpg.exceptions.UniqueViolationError: duplicate key value violates unique constraint "uc_applications_name_vendor"
DETAIL: Key (name, vendor)=(my_name, my_vendor) already exists.
Expected behavior
Ideally, I'd expect to get the existing/newly created object from all attempts at using get_or_create
.
Versions (please complete the following information):
- Database backend used: postgress
- Python version: Python 3.11
ormar
version: 0.12.1pydantic
version: 0.10.2- if applicable
fastapi
version: N/A (0.85.0)
My first impression was that this can be fixed easily by adding another try/except to the get_or_create
-function, but I'm not really sure if we can just import the asyncpg-package here? Thoughts @collerek ?
from asyncpg import UniqueViolationError
...
try:
return await self.get(*args, **kwargs), False
except NoMatch:
_defaults = _defaults or {}
try:
return await self.create(**{**kwargs, **_defaults}), True
except UniqueViolationError:
return await self.get(*args, **kwargs), False
Sorry for the necro, but in a similar vein; ormar
doesn't provide anything in terms of connection management for those tasks. The async with database:
line creates a connection to your database, but then that same connection is implicitly used in both asyncio tasks (via Application.Meta.database
), without any guarantee of which operation will run first. Ideally you'd pass a new database.connection()
to each task, but ormar
doesn't seem to provide any API to use a given connection. Your idea doesn't provide any guarantees, since another process (or task) could create a new Application
after you've caught NoMatch
but before you call .create()
. I think you just need a transaction, but like I said about connections, transactions also can't easily be shared between tasks with ormar