"RuntimeError: read() called while another coroutine is already waiting for incoming data" while asyncio.gather job.refresh
evgenii-moriakhin opened this issue · 4 comments
evgenii-moriakhin commented
I'm trying to setup saq for my work needs, and I've encountered this problem:
If try to execute asyncio.gather for job.refresh() with until_complete set, an error is raised
Failed to consume message
Traceback (most recent call last):
File "/home/evgeniy/rq/saq/saq/queue.py", line 736, in _daemon
message = await self.pubsub.get_message(timeout=None) # type: ignore
File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/client.py", line 927, in get_message
response = await self.parse_response(block=(timeout is None), timeout=timeout)
File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/client.py", line 804, in parse_response
response = await self._execute(
File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/client.py", line 784, in _execute
return await conn.retry.call_with_retry(
File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
return await do()
File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 782, in read_response
response = await self._parser.read_response(
File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 411, in read_response
await self.read_from_socket()
File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 392, in read_from_socket
buffer = await self._stream.read(self._read_size)
File "/usr/lib/python3.10/asyncio/streams.py", line 669, in read
await self._wait_for_data('read')
File "/usr/lib/python3.10/asyncio/streams.py", line 487, in _wait_for_data
raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
here's the code that reproduces the problem
import asyncio
from redis.asyncio import Redis
from saq.job import Job
from saq.queue import Queue
from saq.worker import Worker
def test(ctx):
print("test")
return 1
async def main():
queue = Queue(Redis())
worker = Worker(queue, [test])
asyncio.create_task(worker.start())
jobs = [Job("test") for _ in range(10)]
for job in jobs:
await queue.enqueue(job)
await asyncio.gather(*[job.refresh(until_complete=0) for job in jobs])
await worker.stop()
asyncio.run(main())
evgenii-moriakhin commented
The problem occurs because multiple pubsub daemons are created.
The solution is as follows
class PubSubMultiplexer:
def __init__(self, pubsub: PubSub, prefix: str):
...
self._daemon_creation_lock = asyncio.Lock()
async def start(self) -> None:
async with self._daemon_creation_lock:
if not self._daemon_task:
await self.pubsub.psubscribe(f"{self.prefix}*")
self._daemon_task = asyncio.create_task(self._daemon())
tobymao commented
barakalon commented
Whoops - yeah, that looks like a good solution.
@evgenii-moriakhin do you want to contribute this fix? I'm happy to if you don't want to.
Also, check out Queue.map. Might be a better method for what you're trying to do.
evgenii-moriakhin commented
Yes, I'd be glad to publish my first pr