spring-projects/spring-integration-aws

Consumers for new shards are not launched if shard count > concurrency in KinesisMessageDrivenChannelAdapter

Closed this issue · 2 comments

Currently the code in org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter#populateConsumer does

  for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
    if (consumerInvoker.consumers.size() < this.consumerInvokerMaxCapacity) {
      consumerInvoker.addConsumer(shardConsumer);
      return;
    }
  }

which prevents this.shardConsumers.put(shardOffset, shardConsumer); execution in the end of the method, which in turn seems lead to shard never processed org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.ConsumerDispatcher#run.

using a "consumerAdded" flag + break should solve the issue, i think.

If see an issue, can reproduce and have some idea how to fix, feel free, please, to raise a Pull Request and we will revise it together.

Thank you for understanding!

Sure @artembilan i've added a pull request