ixti/sidekiq-throttled

Multiple sets of jobs with a key_suffix are not processed simultaneously.

Closed this issue · 5 comments

Given a scenario with users A and B, and sidekiq worker configured as below

sidekiq_throttle(
    threshold: {
      period: 1.second,
      limit: ->(user_id) { 5 },
      key_suffix: ->(user_id) { user_id }
    }
)

If I enqueue a 1000 jobs for with a key_suffix: A.id followed by 1000 jobs with a key_suffix: B.id.
Both sets of jobs are not processed simultaneously. Rather what happens is the first 1000 jobs for A are processed/throttled until they are all requeued, at which point B's jobs start processing.

This can make the processing of large sets of jobs significantly slower than sidekiq normally would process.

ixti commented

Can you please explain in a bit more details, with a a bit more expanded example?

Here's a working example.

class MyJob
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker

  sidekiq_throttle(
    threshold: {
      period: 5.seconds,
      limit: ->(_) { 5 },
      key_suffix: ->(user_id) { user_id }
    }
  )

  def perform(user_id)
    puts "performing job for #{user_id}"
  end
end

Enqueuing two set of jobs for a user as follows and then starting sidekiq:
user_id1 = SecureRandom.uuid
user_id2 = SecureRandom.uuid

1000.times { MyJob.perform_async(user_id1) }
1000.times { MyJob.perform_async(user_id2) }

Ideally, 10 jobs should be processed every 5 seconds (5 for each user).
Instead the worker processes/throttles user_id1's jobs before moving on to user_id2's and repeating the same, reducing the actual number of jobs run per period to 5.

ixti commented

Hm. I guess I understand what do you mean. The thing is throttler is just a layer on top of sidekiq worker. So when you do:

1000.times { MyJob.perform_async(user_id1) }
1000.times { MyJob.perform_async(user_id2) }

Sidekiq pushes those jobs in order you fired them. And starts processing them in the same order. Throttler in that case simply makes sure that if throttle level has been reached, it will push job to the end of the queue. So if you will change perform method to be something like:

def perform(user_id)
  puts "performing job for #{user_id}"
  sleep 60 # sleep for a minute
end

And then will start sidekiq with concurrency of 10, for example, you will see that 5 jobs for user_1 will be started, then after some time 5 jobs of second user will get started... then, as soon as one of the job will complete for user_1, another job for him will get started and so on...

Exactly, throttling isn't exactly a usecase Sidekiq caters to, so I doubt this is easily solvable.
I guess the "solution" is to schedule large sets of jobs based on the throttle limit so that they're likely to get processed at the right time.

ixti commented

Well, I think the only real solution here is to add own client middleware that will be "distributing" jobs over multiple queues evenly. In any case I tend to think it's not throttler's issue after all.