aio-libs/aiorwlock

Lock in illegal state after asyncio.exceptions.CancelledError is raised

Closed this issue · 2 comments

Scenario

  1. Reader holds the reader lock
  2. Writer waits for the lock
  3. asyncio.exceptions.CancelledError is raised
  4. No one can acquire the lock again

My motivation was to use "try lock" with timeout by using asyncio.wait_for function (as Python Synchronization Primitives recommendations).
This issue is not only to get "try lock" option, but anyone that call my code can use asyncio.wait_for (somewhere in stack chain) or cancel my task and break the lock.

Root Cause

  1. self._w_state increased only in _wake_up function (if not own already).
  2. But on cancellation we decrease it without increase it before(due to the CancelledError).
    And now self._w_state is -1 !
    For some reason in the CancelledError catch we assume that we increased it.
  3. In the next wake-up the if self._r_state == 0 and self._w_state == 0 is false because self._w_state == -1

This lock can't recover from this illegal state and no one can acquire the lock anymore.

Test example

This test stuck

@pytest.mark.asyncio()
async def test_rw_lock_illegal_state_on_cancellation() -> None:

    async def reader_worker(t_id: int, lock: Union[ReadLock, _ReaderLock], acquired_event: Optional[Event] = None, release_event: Optional[Event] = None) -> None:
        print(f"reader-{t_id} start")
        async with lock:
            if acquired_event:
                acquired_event.set()

            if release_event:
                await release_event.wait()

        print(f"reader-{t_id} end")

    async def writer_worker(t_id: int, lock: Union[WriteLock, _WriterLock]) -> None:
        print(f"writer-{t_id} start")
        async with lock:
            print(f"writer-{t_id} acquired lock")
        print(f"writer-{t_id} end")

    rw_lock = RWLock()
    el = asyncio.get_event_loop()

    reader_acquired_event = Event()
    reader_release_event = Event()
    el.create_task(reader_worker(1, rw_lock.reader, acquired_event=reader_acquired_event, release_event=reader_release_event))
    await reader_acquired_event.wait()
    
    with pytest.raises(TimeoutError):   # Expect to fail on timeout because reader is holding the lock
        await asyncio.wait_for(writer_worker(1, lock=rw_lock.writer), 2)

    reader_release_event.set()  # This will cause the reader complete his work and release the lock

    print("after failure attempt to acquire...")
    async with rw_lock.reader:
        print("after failure => read lock acquired")

    async with rw_lock.writer:
        print("after failure => write lock acquired")

    print("done successfully")

We might need to add some checks that the cancellation is caused by us (I haven't checked the code yet).
Solution may be similar to avoiding cancellation swallowing:
https://superfastpython.com/asyncio-task-cancellation-best-practices/#Practice_1_Do_Not_Consume_CancelledError
We recently implemented this in aiohttp-sse: https://github.com/aio-libs/aiohttp-sse/pull/459/files

The issue is similar to #409
Prior to Python 3.12 wait_for() created a new task internally.
As a workaround for old Python versions I can suggest using async_timeout library instead of wait_for()