RuntimeError: Task <Task pending name='Task-11' coro=...> got Future <Future pending> attached to a different loop
cnicodeme opened this issue · 18 comments
Describe the bug
Hi!
I'm running aioredis via a Celery task. Since Celery is not async by default, I'm creating a custom loop when inside the Celery task, and running an async function from that new loop.
The issue is that aioredis seems to try and access the "registered" loop by calling get_event_loop
at various places.
Based on the documentation](https://docs.python.org/3/library/asyncio-eventloop.html) it would be better to replace all the calls to get_event_loop
to get_running_loop
which would remove that Runtime exception when a future is attached to a different loop.
For instance, this specific error occurs on asyncio/streams.py in wait_closed at line 344
async def wait_closed(self):
await self._protocol._get_close_waiter(self)
To Reproduce
import aioredis, asyncio
async def redis_get():
db = aioredis.from_url('redis://127.0.0.1:6379')
print(await db.scan(cursor=0, match='*', count=10))
await db.close()
if __name__ == '__main__':
original_loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
assert original_loop != loop
loop.run_until_complete(redis_get()) # MOST IMPORTANT PART
loop.stop()
loop.close()
Calling loop.run_until_complete(redis_get())
will generate an error :
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<Connection.disconnect() done, defined at /home/cx42/www/fernand/api/env/lib64/python3.8/site-packages/aioredis/connection.py:794> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fbbf5ed97c0>()]>>
Replacing the above line by
original_loop.run_until_complete(redis_get())
Causes no errors at all (using the default loop)
Expected behavior
Getting the loop on various parts of Aioredis should not expect to have only one loop running, and shouldn't expect it to be the default one
Logs/tracebacks
ConnectionError: Connection closed by server.
File "aioredis/connection.py", line 900, in read_response
response = await self._parser.read_response()
File "aioredis/connection.py", line 398, in read_response
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
RuntimeError: await wasn't used with future
File "extensions/celery_worker.py", line 55, in __call__
loop.run_until_complete(self._async_run(*args, **kwargs))
File "asyncio/base_events.py", line 641, in run_until_complete
return future.result()
File "extensions/celery_worker.py", line 47, in _async_run
async with db: # We create an instance of the database
File "surge/database/database.py", line 147, in __aexit__
await session_ctx.close(exc_type, exc_value, traceback)
File "surge/database/database.py", line 40, in close
await self.run_post_processing()
File "surge/database/database.py", line 59, in run_post_processing
await await_if_needed(handler)
File "__init__.py", line 273, in await_if_needed
return await result
File "surge/database/models.py", line 359, in propagate
await self._send(
File "surge/database/models.py", line 341, in _send
await provider.prepare(action=action, col=col, target_id=target_id, document=document)
File "utils/stream.py", line 123, in prepare
await self.send(params)
File "utils/stream.py", line 129, in send
for channel in await self.get_channels():
File "utils/stream.py", line 58, in get_channels
self.channels = await redis.subscribers(self.organization_id) # An action (Create/Update/Delete) so we send to anyone connected
File "extensions/redis_queue.py", line 63, in subscribers
return await self.__db.pubsub_channels('channel:{}:*'.format(organization_id))
File "aioredis/client.py", line 1085, in execute_command
return await self.parse_response(conn, command_name, **options)
File "aioredis/client.py", line 1101, in parse_response
response = await connection.read_response()
File "aioredis/connection.py", line 910, in read_response
await self.disconnect()
File "aioredis/connection.py", line 806, in disconnect
await self._writer.wait_closed() # type: ignore[union-attr]
File "asyncio/streams.py", line 344, in wait_closed
await self._protocol._get_close_waiter(self)
Python Version
$ python --version
3.10.1 and 3.8.10
aioredis Version
$ python -m pip show aioredis
2.0.1
Additional context
No response
Code of Conduct
- I agree to follow the aio-libs Code of Conduct
I agree; moving to get_running_loop was in the plan as Python 3.6 is EOL. Thanks for notifying.
Ok great! When do you think this will be released?
I am in the process of migrating aioredis to redis-py by RedisLabs or maker of Redis. Because I'm stupid and enrolled in too many classes, I'd expect the asyncio module of redis-py to rollout late February. Really sorry! I decided on this due to the maintainership history of the repo, so again I really apologize for the delay!
That's ok, don't beat yourself up, I was just curious to have an ETA.
Why are you migrating to redis-py? Does RedisLab have a better Python integration?
redis py combines the sync and async together like elastic search py. There are plenty of maintainers there unlike here which has a poor track record of maintainership (which includes my inactivity), and they will support way more features than I could like Redis module and cluster at fast pace. Thanks for understanding :)
Thank you for the clarification, there is one last thing that I wonder, if you could help: Should we switch to RedisLabs's Python integration of Redis, even for async calls? (if so, could you share me the link to the repository please)
Thank you :)
When the asyncio module is merged, yes. You can find the client here: https://github.com/redis/redis-py As of now, continue developing with aioredis as we have not yet merged anything yet. When you migrate to redis-py, you only have to switch the import statements to say redis. The code is an exact replica.
That's awesome!
So you work with Redis to integrate aioredis within py-redis, by providing a similar interface, is that correct?
Correct (tho i'm just a student, not an employee 😅)
That works for me, thank you for your clarification and your time :)
(If there is a place I can track to follow the migration, can you please share it?)
Yes, redis/redis-py#1899
I'm currently bidding my time for my midterms to end next week to finally get started on cluster and modules support (it's just copying more code). If you'd like to help the team, please make a PR in my fork!
Unfortunately I can't, I don't have time neither, even though I'd love to work on these kind of things.
I'll follow the ticket at Redis and I'm excited to see the evolution it will goes into.
Good luck with your midterms!
Hi! @cnicodeme, did you solve the problem? I have tried to asyncio.set_event_loop
before doing any aioredis
calls, but seems it didn't work :-(
No, I'm waiting on the migration to Redis-py to be completed and then it should be good :)
is there any workaround atm?
I'm waiting for the go ahead to drop Python 3.6 support for redis py to implement the correct API. Otherwise, I think the only way to fix it is by monkey patching asyncio in all aioredis files
Having stumbled upon this exact same issue in my code, I'm wondering if you guys may have already resolved it in some way and documented somewhere? Has the migration to redis-py happened already and if so, what's the migration path for downstream use of aioredis?