Lock in illegal state after asyncio.exceptions.CancelledError is raised
Closed this issue · 2 comments
Scenario
- Reader holds the reader lock
- Writer waits for the lock
- asyncio.exceptions.CancelledError is raised
- No one can acquire the lock again
My motivation was to use "try lock" with timeout by using asyncio.wait_for
function (as Python Synchronization Primitives recommendations).
This issue is not only to get "try lock" option, but anyone that call my code can use asyncio.wait_for
(somewhere in stack chain) or cancel my task and break the lock.
Root Cause
self._w_state
increased only in _wake_up function (if not own already).- But on cancellation we decrease it without increase it before(due to the CancelledError).
And now self._w_state is -1 !
For some reason in theCancelledError
catch we assume that we increased it. - In the next
wake-up
theif self._r_state == 0 and self._w_state == 0
is false becauseself._w_state == -1
This lock can't recover from this illegal state and no one can acquire the lock anymore.
Test example
This test stuck
@pytest.mark.asyncio()
async def test_rw_lock_illegal_state_on_cancellation() -> None:
async def reader_worker(t_id: int, lock: Union[ReadLock, _ReaderLock], acquired_event: Optional[Event] = None, release_event: Optional[Event] = None) -> None:
print(f"reader-{t_id} start")
async with lock:
if acquired_event:
acquired_event.set()
if release_event:
await release_event.wait()
print(f"reader-{t_id} end")
async def writer_worker(t_id: int, lock: Union[WriteLock, _WriterLock]) -> None:
print(f"writer-{t_id} start")
async with lock:
print(f"writer-{t_id} acquired lock")
print(f"writer-{t_id} end")
rw_lock = RWLock()
el = asyncio.get_event_loop()
reader_acquired_event = Event()
reader_release_event = Event()
el.create_task(reader_worker(1, rw_lock.reader, acquired_event=reader_acquired_event, release_event=reader_release_event))
await reader_acquired_event.wait()
with pytest.raises(TimeoutError): # Expect to fail on timeout because reader is holding the lock
await asyncio.wait_for(writer_worker(1, lock=rw_lock.writer), 2)
reader_release_event.set() # This will cause the reader complete his work and release the lock
print("after failure attempt to acquire...")
async with rw_lock.reader:
print("after failure => read lock acquired")
async with rw_lock.writer:
print("after failure => write lock acquired")
print("done successfully")
We might need to add some checks that the cancellation is caused by us (I haven't checked the code yet).
Solution may be similar to avoiding cancellation swallowing:
https://superfastpython.com/asyncio-task-cancellation-best-practices/#Practice_1_Do_Not_Consume_CancelledError
We recently implemented this in aiohttp-sse: https://github.com/aio-libs/aiohttp-sse/pull/459/files