swarthy/redis-semaphore

Using function from src/fair-semaphore/index.ts

Closed this issue · 8 comments

I want to avoid reaching to a 3rd party api rate limit.
The way I'd like to achieve it is to call acquire from src/fair-semaphore/index.ts, call the api, and then call to refresh without release so the lock is released lockTimeout milliseconds after the api call completed.

The problem is that for some reason I can't import items from this folder. I'm kind of new to typescript world, perhaps I'm missing something, but when I cloned the project and put the src folder inside my project it worked, but it doesn't work with the package from npm.

Great library btw!

The reason this lib was created is the same that you described. We call 3rd party REST API with limit for different auth token (max 5 simultaneously requests per token).
We use semaphore like that:

const LIMIT_PER_USER = 5

async function callThirdPartyAPI(redisClient: Redis, userId: number, userToken: string, queryString: string) {
  const semaphore = new FairSemaphore(redisClient, `request-for-user-${userId}`, LIMIT_PER_USER)
  await semaphore.acquire()
  try {
    return await callAPI(userToken, queryString)
  } finally {
    await semaphore.release() // other 'callThirdPartyAPI' calls can acquire semaphore sooner than lockTimeout
  }
}

Why you can't use public API too?

That's not good enough for me because my limit is per second. Even if I do the requests one after another (not in parallel at all), but more than 10 times per second, it will not accept my requests.
Hence I want the auto-release to happen after 1 second, you see?

Edit: to make it clear, it want to extend the period of the lock from the moment the api call ended (and then I won't call release). If that functionality can be provided it will be perfect for me.

To limit requests per second you can use much simplier code than fair semaphore (internally semaphore stores state in 3 keys in redis). Idea of rate limiter explained here: redislabs.com, you don't need any library at all, you can implement it by yourself

// Prepare step at initialization
const lua = `local current
local maxcount = tonumber(ARGV[1])
local period = tonumber(ARGV[2])
local delay
current = tonumber(redis.call("get",KEYS[1]))
delay = redis.call("pttl",KEYS[1])
if current == nil or current < maxcount then
  current = redis.call("incr",KEYS[1])
  delay = 0
  if current == 1 then
    redis.call("pexpire",KEYS[1],period)
  end
end
return {current,delay}
`
redisClient.defineCommand('limitRate', {
  numberOfKeys: 1,
  lua: lua
})

async delay(ms) {
  await new Promise(resolve => setTimeout(resolve, ms)
}

const MAX_ATTEMPT_COUNT = 10
const MAX_RPS = 5
const PERIOD = 1000

async function rateLimiter(redisClient, key) {
  const finalKey = 'rate-limit:' + key
  let attempt = 1
  while (attempt++ < MAX_ATTEMPT_COUNT ) {
    const [, ttl] = await redisClient.limitRate(finalKey, MAX_RPS, PERIOD)
    if (ttl === 0) {
      return
    } else {
      await delay(ttl + 1)
    }
  }
  throw new Error(
    `[RateLimit] key: "${finalKey}" Maximum number of retries exceeded`
  )
}

// example
async function callThirdPartyAPI(userId) {
  await rateLimiter(redisClient, `request-for-user-${userId}`)
  return await request(...)
}

The thing is, I want to extend the "lock" time after the requested completed, so I can be sure that I won't get the rate limit error (if the count starts before the api call, time could be spent for whatever reason until the api call actually being sent).
I could just use Semaphore if you think it will perform better, but I'd still prefer using your simple api, just with the possibility to extend the lockTimeout of existing lock.

Ok, while I still recommend to use public API, you can use iternal code (without guarantee that internal API will not change in future) like that:

const { acquire } = require('redis-semaphore/lib/fair-semaphore')
// or for TypeScript
import { acquire } from 'redis-semaphore/lib/fair-semaphore'

Cool xD. Just needed to access via redis-semaphore/lib/....
So this suppose to work right?

import {defaultTimeoutOptions} from "redis-semaphore"
import {acquire, refresh} from "redis-semaphore/lib/fair-semaphore";

import {Redis} from "ioredis";

export class RateLimiter {

    public constructor(private client: Redis, private key: string, private max: number, private timeMs: number) {
    }

    public async limit<T>(callback: () => T) {
        const identifier = await acquire(
            this.client,
            this.key,
            this.max,
            this.timeMs,
            defaultTimeoutOptions.acquireTimeout,
            defaultTimeoutOptions.retryInterval,
        );
        try {
            return await callback();
        } finally {
            // extend the lock period
            await refresh(this.client, this.key, identifier, this.timeMs);
        }
    }
}

I do every api call in the given callback, and refresh will extend the lock period.

Only if time between acquire and refresh calls will be less than timeMs (after timeMs after acquire call lock will be expired and other RateLimiter will be able to acquire semaphore).
Public FairSemaphore class call refresh with interval (setInterval) untill release, so to work properly you need to do the same thing:
1 call acquire
2 call refresh every X ms (X < timeMs) untill request ends (const interval = setInterval(() => refresh(...), X))
3 after request end clearInterval
4 call refresh one more time to extend lock period

Thank you very much for your help!
Much appreciated (: