vutran1710/PyrateLimiter

Threadpool issues with not being able to start a new thread

Closed this issue · 15 comments

#156
Following up from our previous discussion here, I upgraded the package version and set the number of threadpools to be 2(in a 2 core container) but we started seeing this specific error once traffic increased on our server. So i was trying to understand is the threadpool process limit not only a resource contraint but also a concurrent request/unique bucket constraint? Does it make sense for us to have this to a higher number to correspond our concurrent requests estimates?

Also I was curious, on when are the threads created? are those created for each unique bucket and more importantly when are they released? Because we dont want this number dictating traffic to our service.

if self.rate_limiter is None:\n  File \"/app/sample/utils/rate_limiter.py\", line 44, in rate_limiter\n    return build_rate_limiter(self._rate_limiter_rate())\n  File \"/app/sample/pyratelimiter/limiters/redis_rate_limiter.py\", line 76, in build_rate_limiter\n    return RedisRateLimiter(\n  File \"/app/sample/pyratelimiter/limiters/redis_rate_limiter.py\", line 39, in __init__\n    argument=bucket_factory, thread_pool=ThreadPool(processes=number_of_leak_threads)\n  File \"/usr/local/lib/python3.10/multiprocessing/pool.py\", line 930, in __init__\n    Pool.__init__(self, processes, initializer, initargs)\n  File \"/usr/local/lib/python3.10/multiprocessing/pool.py\", line 215, in __init__\n    self._repopulate_pool()\n  File \"/usr/local/lib/python3.10/multiprocessing/pool.py\", line 306, in _repopulate_pool\n    return self._repopulate_pool_static(self._ctx, self.Process,\n  File \"/usr/local/lib/python3.10/multiprocessing/pool.py\", line 329, in _repopulate_pool_static\n    w.start()\n  File \"/usr/local/lib/python3.10/multiprocessing/dummy/__init__.py\", line 51, in start\n    threading.Thread.start(self)\n  File \"/usr/local/lib/python3.10/threading.py\", line 935, in start\n    _start_new_thread(self._bootstrap, ())\nRuntimeError: can't start new thread\n

For more context, this is how we implement our custom bucket's get() function

def get(self, _item: RateItem) -> RedisBucket:
        bucket = RedisBucket([self.rate], self.redis_db, _item.name, self.script_hash)
        self.schedule_leak(bucket, self.base_clock)
        return bucket

Thanks!

def get(self, _item: RateItem) -> RedisBucket:
        bucket = RedisBucket([self.rate], self.redis_db, _item.name, self.script_hash)
        self.schedule_leak(bucket, self.base_clock)
        return bucket

With such implementation, every time a new item arrives the bucketfactory will create a new bucket and assign a new leaking task to this bucket - which is probably not a very good strategy since it will eventually create unlimited number of buckets.

I suppose you only need 1 single bucket to handle these items. You can make it a default bucket in the constructor and return it in the above method.

If you need more buckets, use a dictionary to keep tracks of them.

Just to have a clearer understanding of the context, is your RedisBucket sync or async?

It is async.

await.limiter.try_acquire(name, weight) is how we use it. You could imagine us trying to rate limit n different endpoint with n different rate limits and thats one reason we decided to return a new bucket everytime

Also a followup here, even if we implement a multi bucket controlled by a dictionary, wouldnt then the case threadpool also have to be dynamic in this case? or do we then do a best estimation in the beginning. For example, setting it to 256 in a 2core 4GB container gives me an OSError: Out of memory

Followup 2: If we were to limit usage of an endpoint(i.e a bucket) for every callerID and have multiple different rates based on a callerID(rates will vary based on payment tiers essentially) and we have thousands of different callers.

In this case, we would require a bucket to be created for every unique caller right, so does that mean the threads in the threadpool also need to be as big as your callerID count? and essentially assign one for every bucket/caller?

I think i can modify the lib to use a single thread to handle the leaks, but currently having troubled testing it with redis async since pytest-asyncio is really a pain to work with.

Im trying with different testing approach so this will take some days.

@aaditya-srivathsan I have updated the lib to v3.3.0.

Since this version, PyrateLimiter use only 1 thread for managing all synchronous buckets' leaks and 1 asyncio.Task to handle all asynchronous buckets.

There should be no more problem with resource usage anymore.

Let me know if you still face the issue.

Sounds good let me test it out. Just confirming, do I still need to call a schedule_leak for every new bucket being created and also still create the threadpool to be passed in to the Limiter? Is there a changelog doc i can use to see the differences and make those changes in my code base accordingly?

Sounds good let me test it out. Just confirming, do I still need to call a schedule_leak for every new bucket being created and also still create the threadpool to be passed in to the Limiter? Is there a changelog doc i can use to see the differences and make those changes in my code base accordingly?

  1. You still do need to schedule the leak

  2. No more threadpool

  3. Remove threadpool if you are using one

  4. Other than that, your existing code would still work

  5. there is changelog right below README. But I wasnt too clear on the new update since there is basically no change in the API

The change looks like it is working, although i do see a runtime warning here

usr/local/lib/python3.10/site-packages/pyrate_limiter/abstracts/bucket.py:133: RuntimeWarning: coroutine 'Redis.execute_command' was never awaited
  if isawaitable(bucket.leak(0)):
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
ERROR:otel-handler:Rate limit reached for Execute: 

Any ideas why this might be happening?

The change looks like it is working, although i do see a runtime warning here


usr/local/lib/python3.10/site-packages/pyrate_limiter/abstracts/bucket.py:133: RuntimeWarning: coroutine 'Redis.execute_command' was never awaited

  if isawaitable(bucket.leak(0)):

RuntimeWarning: Enable tracemalloc to get the object allocation traceback

ERROR:otel-handler:Rate limit reached for Execute: 

Any ideas why this might be happening?

Oh its actually fine. Ill silence the warning later but it will not cause you any trouble

You can use v3.4.1 in which I fixed that warning

sounds good! thanks for the quick assistance!