spring-projects/spring-integration-aws

KinesisMessageDrivenChannelAdapter repeating TimeoutException on 2nd instance when shard is closed and exhauted

tonketonky opened this issue · 5 comments

When 2 application instances that consume DynamoDB stream using spring-cloud-stream-binder-aws-kinesis are running at once there can be seen following behavior:

  1. The first instance starts 4 shard consumers:
2022-10-12 09:46:26.433  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665553752342-80c2e5dd', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.567  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.687  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665555448236-62c10f13', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.808  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665559594987-5eb20a1f', reset=false}, state=NEW}] has been started.
  1. The second instance has no shard consumers started
  2. After some time one shard gets closed and a new shard gets open
  3. The first instance closes shard consumer for old shard and starts a shard consumer for new shard:
2022-10-12 10:29:12.316  INFO 1525459 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Stopping the [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=STOP}] on the checkpoint [93694400000000019830554236] because the shard has been CLOSED and exhausted.
2022-10-12 10:29:15.197  INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665563350807-0315bc8f', reset=false}, state=NEW}] has been started.
  1. So far so good. However, the second instance starts a shard consumer for the closed shard, complains about lock not being renewed in time and throws TimeoutException
2022-10-12 10:29:18.288  INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:29:29.249  INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time

java.util.concurrent.TimeoutException: null
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
	at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]

2022-10-12 10:30:19.322  INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:30:30.261  INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time

java.util.concurrent.TimeoutException: null
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
	at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]

2022-10-12 10:31:20.327  INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:31:31.274  INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time

java.util.concurrent.TimeoutException: null
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
	at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]
  1. As a consequence there is an extra record in SpringIntegrationLockRegistry DynamoDB table for the closed shard with the second instance as an owner. This record is being read/updated by the second instance repeatedly which increases read/write usage. The number of these extra records and thus read/write usage increases over the time.

Applications use spring-cloud-stream version 3.2.4 and spring-cloud-stream-binder-kinesis version 2.2.0.

Is there anything that can be done to prevent this behavior?
Thanks!

Thank you for great investigation!

I think we need to double check the shard state when its lock in our hands, so we just stop consuming it and drop the lock if shard is closed.

Will look into this next week.

Hi @artembilan,

is there any progress with this issue? Did you have a chance to look at it?

Thank you!

Sorry, I did not have a chance to look into this: busy preparing Spring Integration 6.0 release.

Just pushed some fix.
@tonkerotti ,
would you mind retesting your solution against the latest 2.5.3-SNAPSHOT version?