SELECT ... FOR UPDATE SKIP LOCKED seems to not lock rows
idmitrievsky opened this issue · 1 comments
Describe the bug
I am trying to use SELECT ... FOR UPDATE SKIP LOCKED
to prevent concurrent tasks from getting the same row. It seems like it doesn't work.
To Reproduce
I tried to annotate the example below with as much comments as makes sense. Let me now if something is unclear!
import asyncio
import uuid
from typing import AsyncGenerator, Iterable
import pytest
from gino import Gino
gino_db = Gino()
class ActionRelation(gino_db.Model):
"""
A very simple table of actions.
"""
__tablename__ = 'action'
action_uuid = gino_db.Column(gino_db.String, primary_key=True)
async def every_action_skip_locked() -> AsyncGenerator[ActionRelation, None]:
"""
Iterate over every action skipping locked ones.
If two workers are running concurrently they should not be able
to get the same row/action because `with_for_update` with `skip_locked` is used.
"""
# get an isolated connection as per Gino docs
async with gino_db.transaction(reuse=False, reusable=False):
# get and lock every action that is not locked yet
iterable: Iterable[ActionRelation] = (
await ActionRelation.query.with_for_update(skip_locked=True).gino.all()
)
# the transaction is not closed
# until this generator is exhausted
for action in iterable:
# update action and yield it
# await action.update(
# ...
# ).apply()
yield action
@pytest.mark.asyncio
async def test_skip_locked():
# init gino
await gino_db.set_bind("postgres://postgres:postgres@localhost:5432/billing")
# create two actions
await ActionRelation.create(
action_uuid=str(uuid.uuid4()),
)
await ActionRelation.create(
action_uuid=str(uuid.uuid4()),
)
# prepare events to organize a scenario:
# 1. first worker opens transaction and locks both rows/actions
# 2. second worker opens transaction and select for update returns nothing because skip locked is specified
# 3. second worker closes transaction
# 4. first worker gets one more row/action from the generator and closes its transaction
is_fst_worker_transaction_opened = asyncio.Event()
is_snd_worker_transaction_closed = asyncio.Event()
actions_accessible_by_fst_worker = []
actions_accessible_by_snd_worker = []
async def fst_worker():
# when this loop starts the transaction is opened inside the generator
async for action in every_action_skip_locked():
is_fst_worker_transaction_opened.set()
# first worker waits for the second worker to close its transaction before proceeding
await is_snd_worker_transaction_closed.wait()
actions_accessible_by_fst_worker.append(action)
async def snd_worker():
# second worker opens the transaction only after first worker started and opened its transaction
await is_fst_worker_transaction_opened.wait()
# when this loop starts the SECOND DIFFERENT transaction is opened
async for action in every_action_skip_locked():
# this will never execute, because all rows are locked by the first worker at this point
actions_accessible_by_snd_worker.append(action)
is_snd_worker_transaction_closed.set()
fst_task = asyncio.create_task(fst_worker())
snd_task = asyncio.create_task(snd_worker())
await asyncio.gather(fst_task, snd_task)
# close gino
await gino_db.pop_bind().close()
assert len(actions_accessible_by_fst_worker) == 2
assert len(actions_accessible_by_snd_worker) == 0
Expected result
I believe, the test should pass.
Actual result
Test fails because second worker has access to every row in the table.
tmp.py::test_skip_locked FAILED [100%]
tmp.py:43 (test_skip_locked)
2 != 0
Expected :0
Actual :2
Environment (please complete the following information):
- GINO: 1.0.1
- SQLAlchemy: 1.3.18
- Other: macOS, asyncpg 0.21
Additional context
I tried understanding this behaviour myself, but couldn't. It seems like according to Gino it actually starts two different transactions. But when I used the debugger to stop execution inside the transaction async context manager and tried executing SELECT ... FOR UPDATE SKIP LOCKED
with raw SQL in the terminal – I got all rows as a result, as if no rows were locked by Gino.
I see now, that the query
await ActionRelation.query.with_for_update(skip_locked=True).gino.all()
uses an implicitly acquired connection that is different from the connection acquired for the transaction (it is actually guaranteed to be different, because I specified reusable=False
).
Not sure how I missed it, sorry for the trouble!