spring-projects/spring-integration-aws

Lock was not renewed in time for Kinesis consumer group

Ravibs139 opened this issue · 2 comments

We are seeing the lock was not renewed in time. We have a setup with 2 consumer instances with 3 kinesis shards.

Exception from log file:
The lock for key 'xxxxx:shardId-000000000000' was not renewed in time
The lock for key 'xxxxx:shardId-000000000001' was not renewed in time
The lock for key 'xxxxx:shardId-000000000002' was not renewed in time

"stack_trace" : "java.util.concurrent.TimeoutException: null\n\tat java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)\n\tat java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)\n\tat org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031)\n\tat org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947)\n\tat org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"

These are the spring properties in my application and other remain as default–
spring.cloud.stream.bindings.input.destination=kinesis-stream
spring.cloud.stream.bindings.input.group= stream-group
spring.cloud.stream.bindings.input.content-type=application/avro

Observations -

  1. The ‘lock was not renewed in time’ error appears after 5 to 8 hours from the new deployment.
  2. After this error, message processing is completely stopped.

Libraries used -
spring-cloud-stream-binder-kinesis.version = 2.2.0
spring-integration-aws.version = 2.5.1
aws-java-sdk-dynamodb = 1.12.85
spring-boot = 2.4.4

Please let me know if any other details are required

The relevant code looks like this:

				boolean lockRenewed = false;
				try {
					lockRenewed = renewLockFuture.get(KinesisMessageDrivenChannelAdapter.this.lockRenewalTimeout,
							TimeUnit.MILLISECONDS);
				}
				catch (Exception ex) {
					if (ex instanceof InterruptedException) {
						Thread.currentThread().interrupt();
					}
					logger.info(ex, () -> "The lock for key '" + this.key + "' was not renewed in time");
				}

where his.lockRenewalTimeout is 10 seconds by default.
Your exception indeed says us that a timeout happens when we wait for lock renewal.
And that happens in the ShardConsumerManager.run():

while (!Thread.currentThread().isInterrupted()) {
    ...
    while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
						LockCompletableFuture lockFuture = this.forRenewing.poll();
						if (lockFuture != null) {
							Lock lock = this.locks.get(lockFuture.lockKey);
							if (lock != null) {
								try {
									if (lock.tryLock()) {
										try {
											lockFuture.complete(true);
										}
										finally {
											lock.unlock();
										}
									}
									else {
										lockFuture.complete(false);
										this.locks.remove(lockFuture.lockKey);
									}
								}
								catch (Exception ex) {
									lockFuture.complete(false);
									logger.error(ex, () -> "Error during locking: " + lock);
								}
							}
							else {
								lockFuture.complete(false);
							}
						}
						else {
							break;
						}
					}
    ...
}

So, or that thread is interrupted somehow. Although that one is fully controlled only internally and via dedicated Executors.newSingleThreadExecutor().
The this.shardConsumerManagerFuture is cancelled only from the stop() of the KinesisMessageDrivenChannelAdapter.

The KinesisMessageDrivenChannelAdapter.this.lockRegistry cannot be null at runtime. So, that's not an option.
There there must be something else in the logs. Consider to turn on DEBUG logging level for the com.amazonaws.services.dynamodbv2 category to see what is going on with your locks.
Or maybe your KinesisMessageDrivenChannelAdapter is just stopped for some reason from your logic...

Enabled the debug and found it is not an issue. Working as designed and good.
Confusion created after failed instance replaced with new instance. Replaced instance logging message under INFO.

1648237462704,{ 1648237462704," ""timestamp"" : ""2022-03-25T19:44:22.698""," 1648237462704," ""message"" : ""The lock for key 'xxxxx:shardId-000000000000' was not renewed in time""," 1648237462704," ""logger_name"" : ""org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter""," 1648237462704," ""thread_name"" : ""-kinesis-dispatcher-2""," 1648237462704," ""level"" : ""INFO""," 1648237462704," ""stack_trace"" : ""java.util.concurrent.TimeoutException: null \tat java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) \tat java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) \tat org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) \tat org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) \tat org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) \tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) \tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) \tat java.base/java.lang.Thread.run(Thread.java:829)' 1648237462704,}

1648237512880,{ 1648237512880," ""timestamp"" : ""2022-03-25T19:45:12.880""," 1648237512880," ""message"" : ""The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49627933755360199924965458680964812412021567528108032002', timestamp=null, stream='xxxxx', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.""," 1648237512880," ""logger_name"" : ""org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter""," 1648237512880," ""thread_name"" : ""-kinesis-consumer-1""," 1648237512880," ""level"" : ""INFO""" 1648237512880,}

After 40-50sec, it starts and pick up the message. Good to close