Consumers for new shards are not launched if shard count > concurrency in KinesisMessageDrivenChannelAdapter
Closed this issue · 2 comments
amalinovskiy commented
Currently the code in org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter#populateConsumer does
for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
if (consumerInvoker.consumers.size() < this.consumerInvokerMaxCapacity) {
consumerInvoker.addConsumer(shardConsumer);
return;
}
}
which prevents this.shardConsumers.put(shardOffset, shardConsumer);
execution in the end of the method, which in turn seems lead to shard never processed org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.ConsumerDispatcher#run
.
using a "consumerAdded" flag + break should solve the issue, i think.
artembilan commented
If see an issue, can reproduce and have some idea how to fix, feel free, please, to raise a Pull Request and we will revise it together.
Thank you for understanding!
amalinovskiy commented
Sure @artembilan i've added a pull request