aio-libs-abandoned/aioredis-py

RuntimeError: Task <Task pending name='Task-11' coro=...> got Future <Future pending> attached to a different loop

cnicodeme opened this issue · 18 comments

Describe the bug

Hi!

I'm running aioredis via a Celery task. Since Celery is not async by default, I'm creating a custom loop when inside the Celery task, and running an async function from that new loop.

The issue is that aioredis seems to try and access the "registered" loop by calling get_event_loop at various places.

Based on the documentation](https://docs.python.org/3/library/asyncio-eventloop.html) it would be better to replace all the calls to get_event_loop to get_running_loop which would remove that Runtime exception when a future is attached to a different loop.

For instance, this specific error occurs on asyncio/streams.py in wait_closed at line 344

    async def wait_closed(self):
        await self._protocol._get_close_waiter(self)

To Reproduce

import aioredis, asyncio


async def redis_get():
    db = aioredis.from_url('redis://127.0.0.1:6379')
    print(await db.scan(cursor=0, match='*', count=10))
    await db.close()


if __name__ == '__main__':
    original_loop = asyncio.get_event_loop()
    loop = asyncio.new_event_loop()
    assert original_loop != loop

    loop.run_until_complete(redis_get())  # MOST IMPORTANT PART
    loop.stop()
    loop.close()

Calling loop.run_until_complete(redis_get()) will generate an error :

Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<Connection.disconnect() done, defined at /home/cx42/www/fernand/api/env/lib64/python3.8/site-packages/aioredis/connection.py:794> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fbbf5ed97c0>()]>>

Replacing the above line by

    original_loop.run_until_complete(redis_get())

Causes no errors at all (using the default loop)

Expected behavior

Getting the loop on various parts of Aioredis should not expect to have only one loop running, and shouldn't expect it to be the default one

Logs/tracebacks

ConnectionError: Connection closed by server.
  File "aioredis/connection.py", line 900, in read_response
    response = await self._parser.read_response()
  File "aioredis/connection.py", line 398, in read_response
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
RuntimeError: await wasn't used with future
  File "extensions/celery_worker.py", line 55, in __call__
    loop.run_until_complete(self._async_run(*args, **kwargs))
  File "asyncio/base_events.py", line 641, in run_until_complete
    return future.result()
  File "extensions/celery_worker.py", line 47, in _async_run
    async with db:  # We create an instance of the database
  File "surge/database/database.py", line 147, in __aexit__
    await session_ctx.close(exc_type, exc_value, traceback)
  File "surge/database/database.py", line 40, in close
    await self.run_post_processing()
  File "surge/database/database.py", line 59, in run_post_processing
    await await_if_needed(handler)
  File "__init__.py", line 273, in await_if_needed
    return await result
  File "surge/database/models.py", line 359, in propagate
    await self._send(
  File "surge/database/models.py", line 341, in _send
    await provider.prepare(action=action, col=col, target_id=target_id, document=document)
  File "utils/stream.py", line 123, in prepare
    await self.send(params)
  File "utils/stream.py", line 129, in send
    for channel in await self.get_channels():
  File "utils/stream.py", line 58, in get_channels
    self.channels = await redis.subscribers(self.organization_id)  # An action (Create/Update/Delete) so we send to anyone connected
  File "extensions/redis_queue.py", line 63, in subscribers
    return await self.__db.pubsub_channels('channel:{}:*'.format(organization_id))
  File "aioredis/client.py", line 1085, in execute_command
    return await self.parse_response(conn, command_name, **options)
  File "aioredis/client.py", line 1101, in parse_response
    response = await connection.read_response()
  File "aioredis/connection.py", line 910, in read_response
    await self.disconnect()
  File "aioredis/connection.py", line 806, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)

Python Version

$ python --version
3.10.1 and 3.8.10

aioredis Version

$ python -m pip show aioredis
2.0.1

Additional context

No response

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct

I agree; moving to get_running_loop was in the plan as Python 3.6 is EOL. Thanks for notifying.

Ok great! When do you think this will be released?

I am in the process of migrating aioredis to redis-py by RedisLabs or maker of Redis. Because I'm stupid and enrolled in too many classes, I'd expect the asyncio module of redis-py to rollout late February. Really sorry! I decided on this due to the maintainership history of the repo, so again I really apologize for the delay!

That's ok, don't beat yourself up, I was just curious to have an ETA.

Why are you migrating to redis-py? Does RedisLab have a better Python integration?

redis py combines the sync and async together like elastic search py. There are plenty of maintainers there unlike here which has a poor track record of maintainership (which includes my inactivity), and they will support way more features than I could like Redis module and cluster at fast pace. Thanks for understanding :)

Thank you for the clarification, there is one last thing that I wonder, if you could help: Should we switch to RedisLabs's Python integration of Redis, even for async calls? (if so, could you share me the link to the repository please)

Thank you :)

When the asyncio module is merged, yes. You can find the client here: https://github.com/redis/redis-py As of now, continue developing with aioredis as we have not yet merged anything yet. When you migrate to redis-py, you only have to switch the import statements to say redis. The code is an exact replica.

That's awesome!

So you work with Redis to integrate aioredis within py-redis, by providing a similar interface, is that correct?

Correct (tho i'm just a student, not an employee 😅)

That works for me, thank you for your clarification and your time :)

(If there is a place I can track to follow the migration, can you please share it?)

Yes, redis/redis-py#1899

I'm currently bidding my time for my midterms to end next week to finally get started on cluster and modules support (it's just copying more code). If you'd like to help the team, please make a PR in my fork!

Unfortunately I can't, I don't have time neither, even though I'd love to work on these kind of things.
I'll follow the ticket at Redis and I'm excited to see the evolution it will goes into.

Good luck with your midterms!

SH659 commented

Hi! @cnicodeme, did you solve the problem? I have tried to asyncio.set_event_loop before doing any aioredis calls, but seems it didn't work :-(

No, I'm waiting on the migration to Redis-py to be completed and then it should be good :)

is there any workaround atm?

I'm waiting for the go ahead to drop Python 3.6 support for redis py to implement the correct API. Otherwise, I think the only way to fix it is by monkey patching asyncio in all aioredis files

Having stumbled upon this exact same issue in my code, I'm wondering if you guys may have already resolved it in some way and documented somewhere? Has the migration to redis-py happened already and if so, what's the migration path for downstream use of aioredis?