Problem with using max connections limit
stinovlas opened this issue · 4 comments
Problem
Both BaseRedisBroker
and RedisScheduleSource
have argument max_connection_pool_size
which is passed to ConnectionPool
. However, ConnectionPool
implementation throws redis.exceptions.ConnectionError
when maximum amount of connections is exceeded. This exception is not caught and bubbles all the way up, which kills the scheduler (and
broker).
# Minimal working example (with scheduler)
import asyncio
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq_redis.schedule_source import RedisScheduleSource
def get_scheduled_task():
return ScheduledTask(
task_name="test_task", labels={}, args=[], kwargs={}, cron="1 1 0 0 0"
)
source = RedisScheduleSource("redis://127.0.0.1:6379", max_connection_pool_size=5)
async def subtest():
task = get_scheduled_task()
await source.add_schedule(task)
print("task added")
await source.delete_schedule(task.schedule_id)
print("task deleted")
async def test():
await asyncio.gather(*[subtest() for _ in range(10)])
if __name__ == "__main__":
asyncio.run(test())
Suggestions
I found out that redis
provides redis.asyncio.BlockingConnectionPool
which waits for connection instead of throwing the exception. There's a configurable timeout (after which the exception is raised). Despite the name, the asyncio variant of BlockingConnectionPool
does not actually block the whole program, context is correctly switched on async sleep.
We could leverage this class to provide easier processing of max connections limit. Otherwise, user would need to override taskiq-redis
classes and replace ConnectionPool
with BlockingConnectionPool
manually.
I see following possibilities:
- Add new argument
connection_pool_cls: Type[ConnectionPool]
forRedisScheduleSource
andBaseRedisBroker
. This would contain anyConnectionPool
subclass (includingBlockingConnectionPool
). This is the one I prefer. - Add new argument
connection_pool: ConnectionPool
forRedisScheduleSource
andBaseRedisBroker
. This would contain an instance of anyConnectionPool
subclass (includingBlockingConnection
). The URL would have to be duplicated in this case (passed both to theConnectionPool
instance andRedisScheduleSource
itself (even if not used, in order to maintain compatible API). - Add new argument
blocking: bool
forRedisScheduleSource
andBaseRedisBroker
. Based on the value, we'd internally decide whether to useConnectionPool
orBlockingConnectionPool
. This is the least flexible, because behaviour cannot be easily changed from outside (e.g. by subclassingConnectionPool
).
In all cases, the change can be made backwards compatible (although I'd argue that current behaviour with uncaught exception doesn't make sense and BlockingConnectionPool
is a good default). Alternatively, we could:
- Change the implementation to
BlockingConnectionPool
and throw awayConnectionPool
altoghether. This would minimize the changes (just replaceConnectionPool
withBlockingConnectionPool
), but it's a breaking change.
Notes
redis.asyncio.RedisCluster
does not suffer the same problem, because it has it's own connection pool handling mechanism and already allows for retries.
*EDIT: There is actually some problem with cluster reconnects. I created redis/redis-py#3135 to resolve it.
We should also consider some modification of RedisAsyncResultBackend
and RedisAsyncClusterResultBackend
. These classes don't accept any argument to limit number of simultaneous connections.
@s3rius I'd appreciate your feedback on the problem and proposed solutions. I'm ready to work on this, when we agree on the details.
Hi and thanks for finding it out. I guess the easiest option is to use a blocking pool without any way to change it. We can just add timeout parameter which configures when the exception is raised. By default we can set it to 1 to simulate the non-blocking pool implementation.
Hi and thanks for finding it out. I guess the easiest option is to use a blocking pool without any way to change it. We can just add timeout parameter which configures when the exception is raised. By default we can set it to 1 to simulate the non-blocking pool implementation.
Sounds good! I'm not sure whether we have to add timeout
argument, since any unknown kwargs
ale passed to ConnectionPool
already. But if you want to be more explicit, we can do that as well.
I'll get to it and we can tune the details in PR.
PR is ready 🙂.