Issue with try_acquire and multiple names with v3+
manu-paylead opened this issue · 6 comments
Hello there!
I'm working on migrating our codebase from the V2.10 to 3.21. I've got most of the thing working but I'm having issues with one of our test cases that use to work and it seems it does not anymore.
Here is the code that's needed to repro (You need a redis server):
from pyrate_limiter import AbstractClock, Duration, Limiter, Rate, RedisBucket
from redis.client import Redis
REDIS_URI = "redis://localhost:6379/0"
BUCKET_KEY = "SomeBucketKey"
class RedisClock(AbstractClock):
def now(self) -> int:
unix_time, microseconds = Redis.from_url(REDIS_URI).time()
return unix_time + microseconds / 1_000_000
if __name__ == "__main__":
Redis.from_url(REDIS_URI).delete(BUCKET_KEY)
limiter = Limiter(
RedisBucket.init(
rates=(Rate(5, 5 * Duration.SECOND),),
redis=Redis.from_url(REDIS_URI),
bucket_key="SomeBucketKey",
),
clock=RedisClock(),
)
for _ in range(3):
limiter.try_acquire("test-1")
for _ in range(3):
limiter.try_acquire("another_key")
Redis.from_url(REDIS_URI).delete(BUCKET_KEY)
With 2.10 I am having no issue with this code (If we except the couple of things that changed in the constructor of Limiter
and the RequestRate
-> Rate
change)
With 3.21.1 I am having BucketFullException: Bucket for item=another_key with Rate limit=5/5.0s is already full
at the line limiter.try_acquire("another_key")
As far as I know "test-1"
and "another_key"
should not conflict on the rate limit of 5. We should be allowed to have 5 of each in the bucket.
Am i misunderstanding something here ?
Regards
Manu
In v3, the lib requires explicit bucket-routing code instead of using item name as value.
If you want separated buckets for each item name, you have to explicitly implement the BucketFactory.get(item_name) method, eg:
from pyrate_limiter import BucketFactory
from pyrate_limiter import AbstractBucket
class RedisBucketFactory(BucketFactory):
def __init__(self):
"""setup your buckets here"""
def get(self, item: RateItem) -> AbstractBucket:
if item.name == 'thing1':
return self.alt_bucket
return self.default_bucket
I advise you look at the updated document to understand more about the changes in v3 design.
Hope this helps.
https://github.com/vutran1710/PyrateLimiter?tab=readme-ov-file#defining-clock--routing-logic-with-bucketfactory
So I came with the example modified like this:
from multiprocessing.pool import ThreadPool
from time import sleep
import structlog
from pyrate_limiter import AbstractBucket, BucketFactory, Duration, Limiter, Rate, RateItem, RedisBucket
from redis.client import Redis
REDIS_URI = "redis://localhost:6379/0"
logger = structlog.get_logger()
def redis_now() -> int:
unix_time, microseconds = Redis.from_url(REDIS_URI).time()
return unix_time + microseconds / 1_000_000
class SimpleRedisBucketFactory(BucketFactory):
def __init__(self, rates, redis, *args, **kwargs):
self.rates = rates
self.redis = redis
self.thread_pool = ThreadPool(processes=1)
self.buckets = {}
logger.warning("__init__")
def wrap_item(self, name: str, weight: int = 1) -> RateItem:
now = redis_now()
logger.warning("wrap_item", name=name, ts=now)
return RateItem(name, redis_now(), weight=weight)
def get(self, _item: RateItem) -> AbstractBucket:
"""For simplicity's sake, all items route to the same, single bucket"""
name = _item.name
logger.warning("get", name=name, buckets=self.buckets)
if not self.buckets.get(name, None):
self.buckets[name] = RedisBucket.init(
rates=self.rates,
redis=self.redis,
bucket_key=name,
)
return self.buckets[name]
if __name__ == "__main__":
Redis.from_url(REDIS_URI).delete("test-1")
Redis.from_url(REDIS_URI).delete("another_key")
limiter = Limiter(
SimpleRedisBucketFactory(
rates=(Rate(5, 5 * Duration.SECOND),),
redis=Redis.from_url(REDIS_URI),
),
)
for _ in range(3):
limiter.try_acquire("test-1")
for _ in range(3):
limiter.try_acquire("another_key")
sleep(15)
for _ in range(3):
limiter.try_acquire("test-1")
for _ in range(3):
limiter.try_acquire("another_key")
But it stills errors out after the 15sec wait with pyrate_limiter.exceptions.BucketFullException: Bucket for item=test-1 with Rate limit=5/5.0s is already full
Am I missing something else here, isn't it supposed to work ?(3 items then 15sec sleep then 3 items should work for a 5items/5seconds rate)
You are very close. The problem is that your BucketFactory is not leaking, so the items in the buckets is not removed periodically. You can either append self.schedule_leak(new_bucket, clock)
right after the initialization of new RedisBucket in your get
method, or you can call self.create(RedisBucket, **your-bucket-class-arguments)
instead.
Example:
class SimpleRedisBucketFactory(BucketFactory):
def __init__(self, rates, redis, *args, **kwargs):
self.rates = rates
self.redis = redis
self.thread_pool = ThreadPool(processes=1)
self.buckets = {}
logger.warning("__init__")
def wrap_item(self, name: str, weight: int = 1) -> RateItem:
now = redis_now()
logger.warning("wrap_item", name=name, ts=now)
return RateItem(name, redis_now(), weight=weight)
def get(self, _item: RateItem) -> AbstractBucket:
"""For simplicity's sake, all items route to the same, single bucket"""
name = _item.name
logger.warning("get", name=name, buckets=self.buckets)
if not self.buckets.get(name, None):
self.buckets[name] = self.create(RedisBucket, rates=self.rates, redis=self.redis, bucket_key=name)
# the line above indentical to...
# self.buckets[name] = RedisBucket(...)
# self.schedule_lake(self.buckets[name], your-clock)
return self.buckets[name]
Note that you still need to pass an apropriate clock instance instead of using now()
as function. That clock is required to make leaking / removing items work properly
The document on this part is quite horrible I must admit. I just updated it here https://github.com/vutran1710/PyrateLimiter?tab=readme-ov-file#creating-buckets-dynamically
I think we can close this