Pubsub: health check races with get_message to read from the socket
bmerry opened this issue · 5 comments
Describe the bug
This is somewhat related to #1206 (both have to do with pubsub and health checks) but a different failure mode. I think this is actually the aioredis equivalent to this redis-py bug that I linked from #1206, and possibly the same approach used in its corresponding PR will work (I haven't had a chance to review the PR).
When issuing a subscribe command on a PubSub for which there are currently no subscriptions and the connection hasn't been used for a while (specifically, the health check interval), the underlying connection will issue a PING to check the health, and try to read the PONG. However, another async task may be blocked in get_message
, also trying to read from the socket. This leads to an exception.
To Reproduce
- Install the PR from
#1207
(which fixes#1206
), or master. - Start a Redis server on localhost.
- Run the script below. It will crash.
#!/usr/bin/env python3
import asyncio
import aioredis
async def poll(ps):
while True:
message = await ps.get_message(timeout=10)
if message is not None:
print(message)
async def main():
r = aioredis.Redis.from_url("redis://localhost", health_check_interval=1)
ps = r.pubsub()
await ps.subscribe("foo")
poller = asyncio.create_task(poll(ps))
await ps.unsubscribe("foo")
await asyncio.sleep(2)
await ps.subscribe("baz")
await asyncio.sleep(0.1)
poller.cancel()
try:
await poller
except asyncio.CancelledError:
pass
asyncio.run(main())
Expected behavior
The script should run without errors.
Logs/tracebacks
Traceback (most recent call last):
File "./crashit_aioredis2.py", line 30, in <module>
asyncio.run(main())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "./crashit_aioredis2.py", line 22, in main
await ps.subscribe("baz")
File "/home/bmerry/src/aioredis/aioredis/client.py", line 4131, in subscribe
ret_val = await self.execute_command("SUBSCRIBE", *new_channels.keys())
File "/home/bmerry/src/aioredis/aioredis/client.py", line 4020, in execute_command
await self._execute(connection, connection.send_command, *args, **kwargs)
File "/home/bmerry/src/aioredis/aioredis/client.py", line 4024, in _execute
return await command(*args, **kwargs)
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 885, in send_command
await self.send_packed_command(
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 854, in send_packed_command
await self.check_health()
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 824, in check_health
if str_if_bytes(await self.read_response()) != "PONG":
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 900, in read_response
response = await self._parser.read_response()
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 537, in read_response
await self.read_from_socket()
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 498, in read_from_socket
buffer = await self._stream.read(self._read_size)
File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
await self._wait_for_data('read')
File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<poll() done, defined at ./crashit_aioredis2.py:8> exception=ConnectionError('Connection closed by server.')>
Traceback (most recent call last):
File "./crashit_aioredis2.py", line 10, in poll
message = await ps.get_message(timeout=10)
File "/home/bmerry/src/aioredis/aioredis/client.py", line 4171, in get_message
response = await self.parse_response(block=False, timeout=timeout)
File "/home/bmerry/src/aioredis/aioredis/client.py", line 4048, in parse_response
if not block and not await conn.can_read(timeout=timeout):
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 893, in can_read
return await self._parser.can_read(timeout)
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 484, in can_read
return await self.read_from_socket(timeout=timeout, raise_on_timeout=False)
File "/home/bmerry/src/aioredis/aioredis/connection.py", line 500, in read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
aioredis.exceptions.ConnectionError: Connection closed by server.
Python Version
$ python --version
Python 3.8.10
aioredis Version
a708bd14b1a8bec0a1f3d469bf5384eb2726b5fa
Additional context
No response
Code of Conduct
- I agree to follow the aio-libs Code of Conduct
cc @Andrew-Chen-Wang since you worked #1207.
I would have created two sockets, but I understand that isn't an option in terms of scaling. One solution might be to implement a lock on the socket when doing single command execution such as SUBSCRIBE which will block out the get_message() from trying to receive a message.
This would be a PubSub specific lock to prevent this failure mode. Because data is returned in single line fashion, that could mean the subscribe call could get unrelated data though. That would mean we'd need to store all responses not related to the locking single execution command to stream out to get_message after we do get a response for subscribe.
It's not ideal, but that's all I've got. Situation gets convoluted if, in the get_message poller, you do more commands like GET or SET. Then we're on a backlog until all single execution commands have been completed. It'd be a nasty queue implementation...
I'll play around with it today.
@bmerry After thinking about it for awhile, I think this is the only expected behavior as shown in asyncio/streams.py
in the StreamReader (where the exception about coroutine is raised):
async def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called.
If stream was paused, automatically resume it.
"""
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
# which coroutine would get the next data.
if self._waiter is not None:
raise RuntimeError(
f'{func_name}() called while another coroutine is '
f'already waiting for incoming data')
assert not self._eof, '_wait_for_data after EOF'
# Waiting for data while paused will make deadlock, so prevent it.
# This is essential for readexactly(n) for case when n > self._limit.
if self._paused:
self._paused = False
self._transport.resume_reading()
self._waiter = self._loop.create_future()
try:
await self._waiter
finally:
self._waiter = None
Specifically:
StreamReader uses a future to link the protocol feed_data() method to a read coroutine. Running two read coroutines at the same time would have an unexpected behaviour. It would not possible to know which coroutine would get the next data.
Again, we want to implement a lock feature such that we can somehow get an ordered stream of response data, but the queue for this would be interesting (I'm imagining a queue/ordered dict with elements/keys' type be Coroutine where the queue can pair response data with the data structure).
It may be worth following the approach in redis/redis-py#1737, which looks promising (basically, get_message
doesn't try to read from the socket while there are no subscriptions, and commands don't do health checks if there are subscriptions).
That sounds good to me 👍