mercadolibre/toiler

Concurrency/parallism question/concern

bglusman opened this issue · 9 comments

Hey @sschepens ! We got our app using Toiler and it's chugging along nicely in our test environment and it looks to have solved a somewhat severe memory leak problem there! So hooray and thanks for help!

The processing characteristics look plenty fast, but very different from what we see on Shoryuken though, and while not necessarily bad, it wasn't obvious to me and I'm wondering if a) we need better docs, and/or b) if we need to/should add some additional config around the differences.

For context, our worker looks approx like this (split the worker out from creator because of memoize/ivar issue discussed previously to make one object per message), and we have 3 queues at present and concurrency of 25:

class CreationQueueWorker
  Queue = ENV.fetch('SQS_CREATE_QUEUE')
  include Toiler::Worker
  toiler_options queue: Queue,
                 auto_delete: true,
                 parser: :json

  def perform(_body, sqs_hash_message)
    Creator.perform(sqs_hash_message)
  end
end

The surprising thing to me, which I think I understand now but wanted to discuss, is that we only ever see one message in flight from the queue... this makes sense now I think about it because it's one thread per queue at a time, but a) am I right that there's no point having higher concurrency than your number of queues? and b) though this may be desirable where you want to guarantee messages are processed in order, in a lot of cases ordering is unimportant, and it would be nice to be able to specify a queue should have as many threads as are available working on it. I'm not sure how I'd do that now without creating multiple worker classes all naming the same queue or something.

Possibly specifying a pool_size option in worker config would make sense for this, with a default size of 1, for no parallelism per queue, but will use up to pool_size threads in parallel when available?

Do I understand correctly, and what do you think? Nice work though, so far I'm impressed with toiler!

Oh related thought, often times there's a basis on which the thread/ordering only matters for a subset of messages, e.g. those with the same uuid, or whatever... Allowing pool size with an optional block/proc that determines which thread gets a message could guarantee in order processing where it matters, but not block unrelated messages from being processed in whatever order, e.g as long as the same across/thread processes all messages with a particular uuid we'd be fine.

Er, s/across/actor/

The surprising thing to me, which I think I understand now but wanted to discuss, is that we only ever see one message in flight from the queue...

You're saying you have plenty of messages to process but only seeing one in flight at a single time? That shouldn't be the case if you specify a concurrency greater than one.

Toiler spawns a single Fetcher for a queue, and it's responsible for constantly pulling and delivering messages to Processors, the amount of Processors spawned is the concurrency specified for the queue.
If multiple Processors are available for processing messages then Fetchers attempt to pull multiple messages on a single call to keep them all at work, but Amazon SQS has a limit of 10 on a single call I believe, but, as a Fetcher pulls ten messages and dispatches them to Processors it's then free again to pull messages if there are any idle Processors.
This should be enough for most use cases, I can see this could become an issue when using so much concurrency that Fetcher cannot keep all workers active, perhaps the amount of fetchers can be made customizable, this would require some rework.

and we have 3 queues at present and concurrency of 25

Where are you specifying the concurrency? Toiler defaults to 1 concurrency, and I don't see that you specify concurrency on toiler_options. That could be why you are seeing only 1 message being processed at a time!

Oh related thought, often times there's a basis on which the thread/ordering only matters for a subset of messages, e.g. those with the same uuid, or whatever... Allowing pool size with an optional block/proc that determines which thread gets a message could guarantee in order processing where it matters, but not block unrelated messages from being processed in whatever order, e.g as long as the same across/thread processes all messages with a particular uuid we'd be fine.

@bglusman Ordering isn't really an option when queuing, it's really hard to guarantee ordering, I don't think SQS guarantees ordering, also, what would happen if a message fails to be processed, that you loose ordering.
I don't really know if this should be included in toiler, plus ordering aslo breaks if consuming from multiple machines. Maybe you could make a gem that wraps Toiler :)

Maybe if I understood right above, pool size is a better term for what's currently called concurrency at global settings level, and each worker optionally has a concurrency level of 1? Not sure, but I think what's there now is just lifted from shoryuken and not necessarily the notary intuitive given how different it is here.

Oh oh, we specify concurrency in toiler config the same way we did in shoryuken, in an initializer not per worker, maybe that's wrong? Docs say toiler defaults to concurrency of 25, which is same default as shoryuken, may want to update :-) That should fix it! And good points above about multiple machines etc. Thanks!

Sorry hadn't seen reply when writing earlier message, on mobile!

Maybe if I understood right above, pool size is a better term for what's currently called concurrency at global settings level

There is no global concurrency configuration in Toiler, that was very specific to Shryuken and how it worked. Toiler requires to specify a concurrency per Worker and defaults it to 1.

Docs say toiler defaults to concurrency of 25

Totally missed that, will update it!

@bglusman Please check my comment at #1.
I will be making that behavior default on a new release, that should fix your instance variable problems, stay tuned!

@bglusman I assume this issue is no longer valid, please create a new one if something else pops up!