Better SQS concurrent message handling
Closed this issue · 4 comments
Type: Feature
SimpleMessageListenerContainer's concurrency model is suboptimal. Current behavior:
- Request N messages from SQS (N <= 10 due to AWS restriction).
- Create a MessageExecutor for each message and execute concurrently.
- Wait for ALL N messages to be handled (including possible SQS deletion invocations).
- Repeat
Problems:
- No message handling occurs while fetching messages from SQS. This could be a significant fraction of time for messages that can be handled quickly.
- Request cycle time is constrained by the most slowly handled message in the batch of N. For example, if one message in a batch of N takes 10 seconds to handle and the rest only take one second, throughput will be only N/10 per second (presuming default executor thread pool sizing).
- No ability to process more than 10 messages concurrently due to AWS restriction on max batch size.
Proposed solution:
Continue to use a spinning thread to retrieve messages from SQS. However, instead of waiting for all messages to be processed before requesting another batch, rely on the executor to enforce the desired level of parallelism and locally queued messages (via its own work queue). The spinning thread should block on the executor queue's ability to enqueue additional work. This would allow for efficient parallel processing wider than 10.
Work-around:
I did essentially the above by implementing @SqsListener
methods that attempt to hand-off messages to another executor pool and retry until the pool accepts the work. This works, but it (a) adds complexity and (b) requires manual acknowledgment handling.
It would be a better developer experience if one could simply specify the number of messages to fetch at once, the max number to process at once, and how many messages should be retrieved ahead of processing availability.
Note:
Even with my work-around, I've been unable to keep a large number of worker threads busy when servicing a queue with messages that are quick to process. The single thread fetching messages from the queue is the limiting factor. An additional enhancement is to allow N message retrieval threads.
If it helps, I can share my solution and code; I wasn't able to code it up in an elegant way however.
My implementation:
- Create a configurable number of poller threads
- Moved message processing to its own thread, it in turn hands off to the MessageExecutor
- Create a configurable fair BlockingQueue at least size 10 * poller_threads
- The N poller threads and 1 processor thread all share that same blocking queue.
- Pollers will poll for 10 messages, and try to add to the queue, and block if it is full.
- The processing thread should almost always have a full queue to work from
- Switched the latch for a semaphore.
Our message process method normally takes around 1-2 seconds or so and with the previous single-poller it maxed out at around 5-9 messages per second.
With the changes above, 3 pollers, with a queue size of 25, and a custom threadExecutor with 30 threads, it hovers around 30-45 messages per second. Note that this means there are a fairly high number of messages 'ready': There are up to 25 in the queue, and then each of the 3 pollers could be holding onto a max of 10 messages trying to add to the queue.
The drawbacks are:
- It uses an additional thread or two minimum, way more depending on how many pollers plus executors you configure.
- It definitely could use some refactoring; I didn't quite know what I was doing at the time.
- This implementation can't support per-queue configurations people have been requesting in other threads without getting way more complex.
- I may have made the shutdown logic way too complex:
- Previously, it waited for some time to shut down just the one thread. Now I have to wait for the multiple pollers and the processor, so there are multiple wait periods as they shut down.
- Previously, a single poller, it could shut down at the end of the loop once the latch reached 0. Now, with both sides being asynchronous, it could be possible the processor tries to shut down before the listeners. I had to take extra care to to make sure the processor waited some time for the listeners to shut down, and then finish any remaining messages.
- This meant previously, there may have been a configured 20 second shut down, but in my code, it could be 20 x 3 or so.
You can see the sample changes here: abracadv8/spring-cloud-aws@d1c83b2
We're seeing the same issue. Is there any news on when this will be picked up? It's killing the throughput.
We are aware, and we want to address it in 3.0 as this requires major rewrite.
Fixed in #344.