collerek/ormar

Race condition in `QuerySet.get_or_create`

Opened this issue · 2 comments

teonat commented

Describe the bug
When asynchronously attempting to get an item that does not exist in the db with get_or_create twice (or more times) concurrently, on a model with an ormar.UniqueColumns-constraint, asyncpg.UniqueViolationError is thrown for the losing attempt at creating it.

To Reproduce

# app.core.db.py
import databases

from app.core.config import settings

database = databases.Database(
    f'postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_URL}'
)

Define a model with an unique constrant:

# app.models.my_models.py
import ormar
import sqlalchemy

from app.core.db import database


class Application(ormar.Model):
    class Meta(ormar.ModelMeta):
        database = database
        metadata = sqlalchemy.MetaData()
        constraints = [ormar.UniqueColumns('name', 'vendor')]  # Ensures unique entries per vendor

    id: int = ormar.Integer(primary_key=True)
    name: str = ormar.String(max_length=256, nullable=False)
    vendor: str = ormar.String(max_length=256)

Attempt to get_or_create something that doesnt exists twice or more times concurrently with asyncio tasks (or taskgroups):

import asyncio

import pytest

from app.core.db import database
from app.models.my_models import Application


@pytest.mark.asyncio
async def test_race_condition():
    async with database:
        tasks = [
            asyncio.create_task(Application.objects.get_or_create(name='my_name', vendor='my_vendor')),
            asyncio.create_task(Application.objects.get_or_create(name='my_name', vendor='my_vendor'))
        ]
        await asyncio.gather(*tasks)
        # async with asyncio.TaskGroup() as tg:
        #     tg.create_task(Application.objects.get_or_create(name='my_name', vendor='my_vendor'))
        #     tg.create_task(Application.objects.get_or_create(name='my_name', vendor='my_vendor'))

The above test results in the following error:

ERROR [root] duplicate key value violates unique constraint "uc_applications_name_vendor"
DETAIL:  Key (name, vendor)=(my_name, my_vendor) already exists.
Traceback (most recent call last):
  File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 1016, in get_or_create
    return await self.get(*args, **kwargs), False
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 965, in get
    return await self.filter(*args, **kwargs).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 989, in get
    self.check_single_result_rows_count(processed_rows)
  File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 243, in check_single_result_rows_count
    raise NoMatch()
ormar.exceptions.NoMatch

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "...\tests\test_application_race_condition.py", line 18, in test_race_condition
    res = await asyncio.gather(*tasks)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 1019, in get_or_create
    return await self.create(**{**kwargs, **_defaults}), True
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\ormar\queryset\queryset.py", line 1125, in create
    instance = await instance.save()
               ^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\ormar\models\model.py", line 94, in save
    pk = await self.Meta.database.execute(expr)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\databases\core.py", line 163, in execute
    return await connection.execute(query, values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\databases\core.py", line 274, in execute
    return await self._connection.execute(built_query)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\databases\backends\postgres.py", line 217, in execute
    return await self._connection.fetchval(query_str, *args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\asyncpg\connection.py", line 645, in fetchval
    data = await self._execute(query, args, 1, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\asyncpg\connection.py", line 1659, in _execute
    result, _ = await self.__execute(
                ^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\asyncpg\connection.py", line 1684, in __execute
    return await self._do_execute(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "...\.venv\win\python311\Lib\site-packages\asyncpg\connection.py", line 1731, in _do_execute
    result = await executor(stmt, None)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
asyncpg.exceptions.UniqueViolationError: duplicate key value violates unique constraint "uc_applications_name_vendor"
DETAIL:  Key (name, vendor)=(my_name, my_vendor) already exists.

Expected behavior
Ideally, I'd expect to get the existing/newly created object from all attempts at using get_or_create.

Versions (please complete the following information):

  • Database backend used: postgress
  • Python version: Python 3.11
  • ormar version: 0.12.1
  • pydantic version: 0.10.2
  • if applicable fastapi version: N/A (0.85.0)
teonat commented

My first impression was that this can be fixed easily by adding another try/except to the get_or_create-function, but I'm not really sure if we can just import the asyncpg-package here? Thoughts @collerek ?

from asyncpg import UniqueViolationError
...
        try:
            return await self.get(*args, **kwargs), False
        except NoMatch:
            _defaults = _defaults or {}
            try:
                return await self.create(**{**kwargs, **_defaults}), True
            except UniqueViolationError:
                return await self.get(*args, **kwargs), False

Sorry for the necro, but in a similar vein; ormar doesn't provide anything in terms of connection management for those tasks. The async with database: line creates a connection to your database, but then that same connection is implicitly used in both asyncio tasks (via Application.Meta.database), without any guarantee of which operation will run first. Ideally you'd pass a new database.connection() to each task, but ormar doesn't seem to provide any API to use a given connection. Your idea doesn't provide any guarantees, since another process (or task) could create a new Application after you've caught NoMatch but before you call .create(). I think you just need a transaction, but like I said about connections, transactions also can't easily be shared between tasks with ormar