tobymao/saq

"RuntimeError: read() called while another coroutine is already waiting for incoming data" while asyncio.gather job.refresh

evgenii-moriakhin opened this issue · 4 comments

I'm trying to setup saq for my work needs, and I've encountered this problem:

If try to execute asyncio.gather for job.refresh() with until_complete set, an error is raised

Failed to consume message
Traceback (most recent call last):
  File "/home/evgeniy/rq/saq/saq/queue.py", line 736, in _daemon
    message = await self.pubsub.get_message(timeout=None)  # type: ignore
  File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/client.py", line 927, in get_message
    response = await self.parse_response(block=(timeout is None), timeout=timeout)
  File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/client.py", line 804, in parse_response
    response = await self._execute(
  File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/client.py", line 784, in _execute
    return await conn.retry.call_with_retry(
  File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
    return await do()
  File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 782, in read_response
    response = await self._parser.read_response(
  File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 411, in read_response
    await self.read_from_socket()
  File "/home/evgeniy/rq/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 392, in read_from_socket
    buffer = await self._stream.read(self._read_size)
  File "/usr/lib/python3.10/asyncio/streams.py", line 669, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.10/asyncio/streams.py", line 487, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data

here's the code that reproduces the problem

import asyncio
from redis.asyncio import Redis
from saq.job import Job
from saq.queue import Queue
from saq.worker import Worker

def test(ctx):
    print("test")
    return 1

async def main():
    queue = Queue(Redis())
    worker = Worker(queue, [test])

    asyncio.create_task(worker.start())
    jobs = [Job("test") for _ in range(10)]
    for job in jobs:
        await queue.enqueue(job)

    await asyncio.gather(*[job.refresh(until_complete=0) for job in jobs])

    await worker.stop()

asyncio.run(main())

The problem occurs because multiple pubsub daemons are created.
The solution is as follows

class PubSubMultiplexer:
    def __init__(self, pubsub: PubSub, prefix: str):
        ...
        self._daemon_creation_lock = asyncio.Lock()
 
    async def start(self) -> None:
        async with self._daemon_creation_lock:
            if not self._daemon_task:
                await self.pubsub.psubscribe(f"{self.prefix}*")
                self._daemon_task = asyncio.create_task(self._daemon())

Whoops - yeah, that looks like a good solution.

@evgenii-moriakhin do you want to contribute this fix? I'm happy to if you don't want to.

Also, check out Queue.map. Might be a better method for what you're trying to do.

Yes, I'd be glad to publish my first pr