ixti/sidekiq-throttled

worker with 2 different concurrency thresholds - stuck on minimum concurrency

Opened this issue · 4 comments

Hey, I've encountered a weird scenario and wanted to hear your thoughts and if my assumption is right.

Say I have the following worker:

class SomeWorker < BaseWorker
  include Sidekiq::Throttled::Worker

  sidekiq_options queue: :some_queue

  sidekiq_throttle(
    concurrency: {
      limit: ->(_, _, model, _, _) { model == 'product' ? 1 : 100 },
      key_suffix: ->(_, _, model, _, return_values) { mode }
    }
  )

As you can see, this worker has different concurrency thresholds. One for "product" models and one for the rest.

From what I've seen, if there's a SomeWorker running at the moment with model of "product" AND at the beginning of the queue has a job of 'SomeWorker' with a product model, it will not insert additional workers of model = something else, because it always checks the beginning of the queue.

Am I correct? If this assumption is true, then the maximum throughput a queue can have is the minimum of the concurrency thresholds of all scenarios (assuming entropy is the same).

Moreover, this means, there's no reason to ever build dynamic concurrency as it is smarter to split into multiple workers in different queues.

ixti commented

When you use dynamic concurrency, you need to specify dynamic :key_suffix:

image

ixti commented

The lock works in a following way:

  1. it pulls job from the queue.
  2. if job supposed to be "throttled" it pushes it back to the (!) end of the queue.

Sorry @ixti I missed the key_suffix in the example above. I corrected in the initial message. I still see the same behavior as I mentioned.

How can you push to the end of the queue? If Im not mistaken, Sidekiq queues are "stacks".

ixti commented

By stack I assume you mean LIFO queue. That's not about Sidekiq, which uses FIFO model. Under the hood it uses redis which provides a simple LIST primitive. Sidekiq pushes job on the left side of the list, and pops them from the right side.

In case of throttling we are re-adding jobs back to queue with LPUSH (as if they were added via perform_async).

I totally agree about dynamic throttling being kinda weird. I was pretty against adding it in a first place, but some users find it useful. I prefer deterministic throttling by describing "buckets" instead (having multiple worker classes with different throttling settings). But if one wants to use dynamic throttling that's how it will look:

class FooWorker
  # ...

  CONCURRENCY = {
    "standard" => 10,
    "gold" => 100,
    "premium" => 1000
  }

  sidekiq_throttle({
    :concurrency => {
      :key_suffix => ->(plan, *) { CONCURRENCY.key?(plan) ? plan : "unknown" },
      :limit => ->(plan, *) { CONCURRENCY.fetch(plan) { 1 } }
    }
  })

  def perform(plan, delay)
    sleep delay
  end
end

Now you will be able to run:

10_000.times do
  FooWorker.perform_async("standard", 1234)
  FooWorker.perform_async("gold", 1234)
  FooWorker.perform_async("premium", 1234)
end

That will allow 10 concurrent jobs with standard plan, 100 of gold and 1000ofpremium`. 1110 jobs in total. Due to implementation real throughput might be lower - that's by design, as we re trying to guarantee no more than given limit.