taskiq-python/taskiq-redis

Problem with using max connections limit

stinovlas opened this issue · 4 comments

Problem

Both BaseRedisBroker and RedisScheduleSource have argument max_connection_pool_size which is passed to ConnectionPool. However, ConnectionPool implementation throws redis.exceptions.ConnectionError when maximum amount of connections is exceeded. This exception is not caught and bubbles all the way up, which kills the scheduler (and
broker).

# Minimal working example (with scheduler)
import asyncio

from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq_redis.schedule_source import RedisScheduleSource

def get_scheduled_task():
    return ScheduledTask(
        task_name="test_task", labels={}, args=[], kwargs={}, cron="1 1 0 0 0"
    )

source = RedisScheduleSource("redis://127.0.0.1:6379", max_connection_pool_size=5)

async def subtest():
    task = get_scheduled_task()
    await source.add_schedule(task)
    print("task added")
    await source.delete_schedule(task.schedule_id)
    print("task deleted")

async def test():
    await asyncio.gather(*[subtest() for _ in range(10)])

if __name__ == "__main__":
    asyncio.run(test())

Suggestions

I found out that redis provides redis.asyncio.BlockingConnectionPool which waits for connection instead of throwing the exception. There's a configurable timeout (after which the exception is raised). Despite the name, the asyncio variant of BlockingConnectionPool does not actually block the whole program, context is correctly switched on async sleep.

We could leverage this class to provide easier processing of max connections limit. Otherwise, user would need to override taskiq-redis classes and replace ConnectionPool with BlockingConnectionPool manually.

I see following possibilities:

  1. Add new argument connection_pool_cls: Type[ConnectionPool] for RedisScheduleSource and BaseRedisBroker. This would contain any ConnectionPool subclass (including BlockingConnectionPool). This is the one I prefer.
  2. Add new argument connection_pool: ConnectionPool for RedisScheduleSource and BaseRedisBroker. This would contain an instance of any ConnectionPool subclass (including BlockingConnection). The URL would have to be duplicated in this case (passed both to the ConnectionPool instance and RedisScheduleSource itself (even if not used, in order to maintain compatible API).
  3. Add new argument blocking: bool for RedisScheduleSource and BaseRedisBroker. Based on the value, we'd internally decide whether to use ConnectionPool or BlockingConnectionPool. This is the least flexible, because behaviour cannot be easily changed from outside (e.g. by subclassing ConnectionPool).

In all cases, the change can be made backwards compatible (although I'd argue that current behaviour with uncaught exception doesn't make sense and BlockingConnectionPool is a good default). Alternatively, we could:

  1. Change the implementation to BlockingConnectionPool and throw away ConnectionPool altoghether. This would minimize the changes (just replace ConnectionPool with BlockingConnectionPool), but it's a breaking change.

Notes

redis.asyncio.RedisCluster does not suffer the same problem, because it has it's own connection pool handling mechanism and already allows for retries.
*EDIT: There is actually some problem with cluster reconnects. I created redis/redis-py#3135 to resolve it.

We should also consider some modification of RedisAsyncResultBackend and RedisAsyncClusterResultBackend. These classes don't accept any argument to limit number of simultaneous connections.

@s3rius I'd appreciate your feedback on the problem and proposed solutions. I'm ready to work on this, when we agree on the details.

s3rius commented

Hi and thanks for finding it out. I guess the easiest option is to use a blocking pool without any way to change it. We can just add timeout parameter which configures when the exception is raised. By default we can set it to 1 to simulate the non-blocking pool implementation.

Hi and thanks for finding it out. I guess the easiest option is to use a blocking pool without any way to change it. We can just add timeout parameter which configures when the exception is raised. By default we can set it to 1 to simulate the non-blocking pool implementation.

Sounds good! I'm not sure whether we have to add timeout argument, since any unknown kwargs ale passed to ConnectionPool already. But if you want to be more explicit, we can do that as well.

I'll get to it and we can tune the details in PR.

PR is ready 🙂.