worker with 2 different concurrency thresholds - stuck on minimum concurrency
Opened this issue · 4 comments
Hey, I've encountered a weird scenario and wanted to hear your thoughts and if my assumption is right.
Say I have the following worker:
class SomeWorker < BaseWorker
include Sidekiq::Throttled::Worker
sidekiq_options queue: :some_queue
sidekiq_throttle(
concurrency: {
limit: ->(_, _, model, _, _) { model == 'product' ? 1 : 100 },
key_suffix: ->(_, _, model, _, return_values) { mode }
}
)
As you can see, this worker has different concurrency thresholds. One for "product" models and one for the rest.
From what I've seen, if there's a SomeWorker running at the moment with model of "product" AND at the beginning of the queue has a job of 'SomeWorker' with a product model, it will not insert additional workers of model = something else, because it always checks the beginning of the queue.
Am I correct? If this assumption is true, then the maximum throughput a queue can have is the minimum of the concurrency thresholds of all scenarios (assuming entropy is the same).
Moreover, this means, there's no reason to ever build dynamic concurrency as it is smarter to split into multiple workers in different queues.
When you use dynamic concurrency, you need to specify dynamic :key_suffix:
The lock works in a following way:
- it pulls job from the queue.
- if job supposed to be "throttled" it pushes it back to the (!) end of the queue.
Sorry @ixti I missed the key_suffix in the example above. I corrected in the initial message. I still see the same behavior as I mentioned.
How can you push to the end of the queue? If Im not mistaken, Sidekiq queues are "stacks".
By stack I assume you mean LIFO queue. That's not about Sidekiq, which uses FIFO model. Under the hood it uses redis which provides a simple LIST primitive. Sidekiq pushes job on the left side of the list, and pops them from the right side.
In case of throttling we are re-adding jobs back to queue with LPUSH (as if they were added via perform_async
).
I totally agree about dynamic throttling being kinda weird. I was pretty against adding it in a first place, but some users find it useful. I prefer deterministic throttling by describing "buckets" instead (having multiple worker classes with different throttling settings). But if one wants to use dynamic throttling that's how it will look:
class FooWorker
# ...
CONCURRENCY = {
"standard" => 10,
"gold" => 100,
"premium" => 1000
}
sidekiq_throttle({
:concurrency => {
:key_suffix => ->(plan, *) { CONCURRENCY.key?(plan) ? plan : "unknown" },
:limit => ->(plan, *) { CONCURRENCY.fetch(plan) { 1 } }
}
})
def perform(plan, delay)
sleep delay
end
end
Now you will be able to run:
10_000.times do
FooWorker.perform_async("standard", 1234)
FooWorker.perform_async("gold", 1234)
FooWorker.perform_async("premium", 1234)
end
That will allow 10 concurrent jobs with standard
plan, 100 of gold
and 1000of
premium`. 1110 jobs in total. Due to implementation real throughput might be lower - that's by design, as we re trying to guarantee no more than given limit.