vutran1710/PyrateLimiter

Regression in version 3.2.0. Getting OSError: [Errno 24] Too many open files

Closed this issue · 2 comments

We updated our pyrate-limiter from 3.1.1 to 3.2.0 and we started getting the following error:

ERROR    Exception inside application: [Errno 24] Too many open files
Traceback (most recent call last):
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/routing.py", line 62, in __call__
    return await application(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/security/websocket.py", line 37, in __call__
    return await self.application(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/sessions.py", line 47, in __call__
    return await self.inner(dict(scope, cookies=cookies), receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/sessions.py", line 263, in __call__
    return await self.inner(wrapper.scope, receive, wrapper.send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/auth.py", line 185, in __call__
    return await super().__call__(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/middleware.py", line 24, in __call__
    return await self.inner(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/routing.py", line 116, in __call__
    return await application(
           ^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/consumer.py", line 94, in app
    return await consumer(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/utils.py", line 50, in await_many_dispatch
    await dispatch(result)
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/asgiref/sync.py", line 479, in __call__
    ret: _R = await loop.run_in_executor(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/db.py", line 13, in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/asgiref/sync.py", line 538, in thread_handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/consumer.py", line 125, in dispatch
    handler(message)
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/channels/generic/websocket.py", line 38, in websocket_connect
    self.connect()
  File "/srv/cs/apps/ws/consumers/meet.py", line 19, in wrapper
    return func(*arg, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/srv/cs/apps/ws/consumers/meet.py", line 57, in connect
    self.scoreboard_limiter = Limiter(Rate(limit=1, interval=Duration.MINUTE))
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 84, in __init__
    self.bucket_factory = self._init_bucket_factory(argument, clock=clock)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 121, in _init_bucket_factory
    argument = SingleBucketFactory(argument, clock)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/.virtualenvs/python3.11/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 44, in __init__
    self.thread_pool = ThreadPool(processes=1)
                       ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 930, in __init__
    Pool.__init__(self, processes, initializer, initargs)
  File "/usr/lib/python3.11/multiprocessing/pool.py", line 196, in __init__
    self._change_notifier = self._ctx.SimpleQueue()
                            ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/queues.py", line 340, in __init__
    self._reader, self._writer = connection.Pipe(duplex=False)
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/connection.py", line 543, in Pipe
    fd1, fd2 = os.pipe()
               ^^^^^^^^^
OSError: [Errno 24] Too many open files

We are using Django Channels with Daphne and our Consumer Implementation used to look like this:

from pyrate_limiter import Limiter, Rate, Duration
from channels.generic.websocket import JsonWebsocketConsumer


class MeetConsumer(JsonWebsocketConsumer):
    def connect(self):
        # ...
        self.scoreboard_limiter = Limiter(Rate(limit=1, interval=Duration.MINUTE))
        # ...

I've tried to use RedisBucket, but we still got the same error.

from django.conf import settings
from redis import StrictRedis, ConnectionPool
from pyrate_limiter import Limiter, Duration, Rate, RedisBucket
from channels.generic.websocket import JsonWebsocketConsumer


redis_pool = ConnectionPool.from_url(settings.REDIS_DB_URL)
redis_client = StrictRedis(connection_pool=redis_pool)


class MeetConsumer(JsonWebsocketConsumer):
    def connect(self):
        # Obtains the 'meet_id' parameter from the URL route (in routing.py)
        # that opened the WebSocket connection to the consumer.
        #
        # Every consumer has a scope that contains information about its connection,
        # including in particular any positional or keyword arguments from the URL route
        # and the currently authenticated user if any.
        self.meet_id = self.scope["url_route"]["kwargs"]["meet_id"]

        # Group names are restricted to ASCII alphanumerics, hyphens, and periods only.
        # Since this code constructs a group name directly from the room name,
        # it will fail if the room name contains any characters that aren’t valid in a group name.
        self.meet_group_name = "meet_%s" % self.meet_id

        self.scoreboard_limiter = Limiter(
            RedisBucket.init(
                [Rate(limit=1, interval=Duration.MINUTE)],
                redis_client,
                f"{self.meet_group_name}_scoreboard_limiter",
            )
        )

Reverting back to 3.1.1 and the issue do not exists anymore (against both of the listed consumer implementations).
Looking at 3.2.0 it looks like there is relation between the changes and the stack trace of the error, but I don't know how to fix it.

I'm also open for suggestions if there is something wrong with our consumer implementation.

Maybe you can create a custom ThreadPool with smaller limit number of processes and pass it to Limiter https://github.com/vutran1710/PyrateLimiter/blob/master/pyrate_limiter/limiter.py#L71

I'm not sure that's the issue (processes=10). The value is low enough compared to the OS limits.
My understanding is that this is somehow related to how django-channels/daphne handle the consumer classes.
Every Consumer instance represents a single web socket connection and probably for some reason there is either some kind of leaking or different consumers don't share the same ThreadPool.

Our fix for now is to just don't use the library in this case, we switched to a simple rate-limit decorator. I'm sharing the code for anyone experiencing the same issue.

class RateLimitException(Exception):
    pass


def rate_limits(max_calls, period):
    def decorator(func):
        calls = 0
        last_reset = time.time()

        def wrapper(*args, **kwargs):
            nonlocal calls, last_reset

            # Calculate time elapsed since last reset
            elapsed = time.time() - last_reset

            # If elapsed time is greater than the period, reset the call count
            if elapsed > period:
                calls = 0
                last_reset = time.time()

            # Check if the call count has reached the maximum limit
            if calls >= max_calls:
                raise RateLimitException("Rate limit exceeded. Please try again later.")

            # Increment the call count
            calls += 1

            # Call the original function
            return func(*args, **kwargs)

        return wrapper

    return decorator