vutran1710/PyrateLimiter

Issue with try_acquire and multiple names with v3+

manu-paylead opened this issue · 6 comments

Hello there!
I'm working on migrating our codebase from the V2.10 to 3.21. I've got most of the thing working but I'm having issues with one of our test cases that use to work and it seems it does not anymore.

Here is the code that's needed to repro (You need a redis server):

from pyrate_limiter import AbstractClock, Duration, Limiter, Rate, RedisBucket
from redis.client import Redis

REDIS_URI = "redis://localhost:6379/0"
BUCKET_KEY = "SomeBucketKey"


class RedisClock(AbstractClock):
    def now(self) -> int:
        unix_time, microseconds = Redis.from_url(REDIS_URI).time()
        return unix_time + microseconds / 1_000_000


if __name__ == "__main__":
    Redis.from_url(REDIS_URI).delete(BUCKET_KEY)
    limiter = Limiter(
        RedisBucket.init(
            rates=(Rate(5, 5 * Duration.SECOND),),
            redis=Redis.from_url(REDIS_URI),
            bucket_key="SomeBucketKey",
        ),
        clock=RedisClock(),
    )
    for _ in range(3):
        limiter.try_acquire("test-1")
    for _ in range(3):
        limiter.try_acquire("another_key")
    Redis.from_url(REDIS_URI).delete(BUCKET_KEY)

With 2.10 I am having no issue with this code (If we except the couple of things that changed in the constructor of Limiter and the RequestRate -> Rate change)
With 3.21.1 I am having BucketFullException: Bucket for item=another_key with Rate limit=5/5.0s is already full at the line limiter.try_acquire("another_key")

As far as I know "test-1" and "another_key" should not conflict on the rate limit of 5. We should be allowed to have 5 of each in the bucket.

Am i misunderstanding something here ?

Regards

Manu

In v3, the lib requires explicit bucket-routing code instead of using item name as value.
If you want separated buckets for each item name, you have to explicitly implement the BucketFactory.get(item_name) method, eg:

from pyrate_limiter import BucketFactory
from pyrate_limiter import AbstractBucket


class RedisBucketFactory(BucketFactory):
    def __init__(self):
        """setup your buckets here"""

    def get(self, item: RateItem) -> AbstractBucket:
        if item.name == 'thing1':
             return self.alt_bucket
        return self.default_bucket

I advise you look at the updated document to understand more about the changes in v3 design.
Hope this helps.
https://github.com/vutran1710/PyrateLimiter?tab=readme-ov-file#defining-clock--routing-logic-with-bucketfactory

So I came with the example modified like this:

from multiprocessing.pool import ThreadPool
from time import sleep

import structlog
from pyrate_limiter import AbstractBucket, BucketFactory, Duration, Limiter, Rate, RateItem, RedisBucket
from redis.client import Redis

REDIS_URI = "redis://localhost:6379/0"

logger = structlog.get_logger()


def redis_now() -> int:
    unix_time, microseconds = Redis.from_url(REDIS_URI).time()
    return unix_time + microseconds / 1_000_000


class SimpleRedisBucketFactory(BucketFactory):
    def __init__(self, rates, redis, *args, **kwargs):
        self.rates = rates
        self.redis = redis
        self.thread_pool = ThreadPool(processes=1)
        self.buckets = {}
        logger.warning("__init__")

    def wrap_item(self, name: str, weight: int = 1) -> RateItem:
        now = redis_now()
        logger.warning("wrap_item", name=name, ts=now)
        return RateItem(name, redis_now(), weight=weight)

    def get(self, _item: RateItem) -> AbstractBucket:
        """For simplicity's sake, all items route to the same, single bucket"""
        name = _item.name
        logger.warning("get", name=name, buckets=self.buckets)
        if not self.buckets.get(name, None):
            self.buckets[name] = RedisBucket.init(
                rates=self.rates,
                redis=self.redis,
                bucket_key=name,
            )
        return self.buckets[name]


if __name__ == "__main__":
    Redis.from_url(REDIS_URI).delete("test-1")
    Redis.from_url(REDIS_URI).delete("another_key")

    limiter = Limiter(
        SimpleRedisBucketFactory(
            rates=(Rate(5, 5 * Duration.SECOND),),
            redis=Redis.from_url(REDIS_URI),
        ),
    )
    for _ in range(3):
        limiter.try_acquire("test-1")
    for _ in range(3):
        limiter.try_acquire("another_key")

    sleep(15)
    for _ in range(3):
        limiter.try_acquire("test-1")
    for _ in range(3):
        limiter.try_acquire("another_key")

But it stills errors out after the 15sec wait with pyrate_limiter.exceptions.BucketFullException: Bucket for item=test-1 with Rate limit=5/5.0s is already full

Am I missing something else here, isn't it supposed to work ?(3 items then 15sec sleep then 3 items should work for a 5items/5seconds rate)

You are very close. The problem is that your BucketFactory is not leaking, so the items in the buckets is not removed periodically. You can either append self.schedule_leak(new_bucket, clock) right after the initialization of new RedisBucket in your get method, or you can call self.create(RedisBucket, **your-bucket-class-arguments) instead.

Example:

class SimpleRedisBucketFactory(BucketFactory):
    def __init__(self, rates, redis, *args, **kwargs):
        self.rates = rates
        self.redis = redis
        self.thread_pool = ThreadPool(processes=1)
        self.buckets = {}
        logger.warning("__init__")

    def wrap_item(self, name: str, weight: int = 1) -> RateItem:
        now = redis_now()
        logger.warning("wrap_item", name=name, ts=now)
        return RateItem(name, redis_now(), weight=weight)

    def get(self, _item: RateItem) -> AbstractBucket:
        """For simplicity's sake, all items route to the same, single bucket"""
        name = _item.name
        logger.warning("get", name=name, buckets=self.buckets)
        if not self.buckets.get(name, None):
            self.buckets[name] = self.create(RedisBucket, rates=self.rates, redis=self.redis, bucket_key=name)
            # the line above indentical to...
            # self.buckets[name] = RedisBucket(...)
            # self.schedule_lake(self.buckets[name], your-clock)
        return self.buckets[name]

Note that you still need to pass an apropriate clock instance instead of using now() as function. That clock is required to make leaking / removing items work properly

The document on this part is quite horrible I must admit. I just updated it here https://github.com/vutran1710/PyrateLimiter?tab=readme-ov-file#creating-buckets-dynamically

I think we can close this