aio-libs-abandoned/aioredis-py

Pubsub: health check races with get_message to read from the socket

bmerry opened this issue · 5 comments

Describe the bug

This is somewhat related to #1206 (both have to do with pubsub and health checks) but a different failure mode. I think this is actually the aioredis equivalent to this redis-py bug that I linked from #1206, and possibly the same approach used in its corresponding PR will work (I haven't had a chance to review the PR).

When issuing a subscribe command on a PubSub for which there are currently no subscriptions and the connection hasn't been used for a while (specifically, the health check interval), the underlying connection will issue a PING to check the health, and try to read the PONG. However, another async task may be blocked in get_message, also trying to read from the socket. This leads to an exception.

To Reproduce

  1. Install the PR from #1207 (which fixes #1206), or master.
  2. Start a Redis server on localhost.
  3. Run the script below. It will crash.
#!/usr/bin/env python3

import asyncio

import aioredis


async def poll(ps):
    while True:
        message = await ps.get_message(timeout=10)
        if message is not None:
            print(message)


async def main():
    r = aioredis.Redis.from_url("redis://localhost", health_check_interval=1)
    ps = r.pubsub()
    await ps.subscribe("foo")
    poller = asyncio.create_task(poll(ps))
    await ps.unsubscribe("foo")
    await asyncio.sleep(2)
    await ps.subscribe("baz")
    await asyncio.sleep(0.1)
    poller.cancel()
    try:
        await poller
    except asyncio.CancelledError:
        pass

asyncio.run(main())

Expected behavior

The script should run without errors.

Logs/tracebacks

Traceback (most recent call last):
  File "./crashit_aioredis2.py", line 30, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "./crashit_aioredis2.py", line 22, in main
    await ps.subscribe("baz")
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4131, in subscribe
    ret_val = await self.execute_command("SUBSCRIBE", *new_channels.keys())
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4020, in execute_command
    await self._execute(connection, connection.send_command, *args, **kwargs)
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4024, in _execute
    return await command(*args, **kwargs)
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 885, in send_command
    await self.send_packed_command(
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 854, in send_packed_command
    await self.check_health()
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 824, in check_health
    if str_if_bytes(await self.read_response()) != "PONG":
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 900, in read_response
    response = await self._parser.read_response()
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 537, in read_response
    await self.read_from_socket()
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 498, in read_from_socket
    buffer = await self._stream.read(self._read_size)
  File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<poll() done, defined at ./crashit_aioredis2.py:8> exception=ConnectionError('Connection closed by server.')>
Traceback (most recent call last):
  File "./crashit_aioredis2.py", line 10, in poll
    message = await ps.get_message(timeout=10)
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4171, in get_message
    response = await self.parse_response(block=False, timeout=timeout)
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4048, in parse_response
    if not block and not await conn.can_read(timeout=timeout):
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 893, in can_read
    return await self._parser.can_read(timeout)
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 484, in can_read
    return await self.read_from_socket(timeout=timeout, raise_on_timeout=False)
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 500, in read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
aioredis.exceptions.ConnectionError: Connection closed by server.

Python Version

$ python --version
Python 3.8.10

aioredis Version

a708bd14b1a8bec0a1f3d469bf5384eb2726b5fa

Additional context

No response

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct

cc @Andrew-Chen-Wang since you worked #1207.

I would have created two sockets, but I understand that isn't an option in terms of scaling. One solution might be to implement a lock on the socket when doing single command execution such as SUBSCRIBE which will block out the get_message() from trying to receive a message.

This would be a PubSub specific lock to prevent this failure mode. Because data is returned in single line fashion, that could mean the subscribe call could get unrelated data though. That would mean we'd need to store all responses not related to the locking single execution command to stream out to get_message after we do get a response for subscribe.

It's not ideal, but that's all I've got. Situation gets convoluted if, in the get_message poller, you do more commands like GET or SET. Then we're on a backlog until all single execution commands have been completed. It'd be a nasty queue implementation...

I'll play around with it today.

@bmerry After thinking about it for awhile, I think this is the only expected behavior as shown in asyncio/streams.py in the StreamReader (where the exception about coroutine is raised):

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.

        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')

        assert not self._eof, '_wait_for_data after EOF'

        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()

        self._waiter = self._loop.create_future()
        try:
            await self._waiter
        finally:
            self._waiter = None

Specifically:

StreamReader uses a future to link the protocol feed_data() method to a read coroutine. Running two read coroutines at the same time would have an unexpected behaviour. It would not possible to know which coroutine would get the next data.

Again, we want to implement a lock feature such that we can somehow get an ordered stream of response data, but the queue for this would be interesting (I'm imagining a queue/ordered dict with elements/keys' type be Coroutine where the queue can pair response data with the data structure).

It may be worth following the approach in redis/redis-py#1737, which looks promising (basically, get_message doesn't try to read from the socket while there are no subscriptions, and commands don't do health checks if there are subscriptions).

That sounds good to me 👍