KinesisMessageDrivenChannelAdapter repeating TimeoutException on 2nd instance when shard is closed and exhauted
tonketonky opened this issue · 5 comments
When 2 application instances that consume DynamoDB stream using spring-cloud-stream-binder-aws-kinesis
are running at once there can be seen following behavior:
- The first instance starts 4 shard consumers:
2022-10-12 09:46:26.433 INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665553752342-80c2e5dd', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.567 INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.687 INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665555448236-62c10f13', reset=false}, state=NEW}] has been started.
2022-10-12 09:46:26.808 INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665559594987-5eb20a1f', reset=false}, state=NEW}] has been started.
- The second instance has no shard consumers started
- After some time one shard gets closed and a new shard gets open
- The first instance closes shard consumer for old shard and starts a shard consumer for new shard:
2022-10-12 10:29:12.316 INFO 1525459 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Stopping the [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=STOP}] on the checkpoint [93694400000000019830554236] because the shard has been CLOSED and exhausted.
2022-10-12 10:29:15.197 INFO 1525459 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665563350807-0315bc8f', reset=false}, state=NEW}] has been started.
- So far so good. However, the second instance starts a shard consumer for the closed shard, complains about lock not being renewed in time and throws
TimeoutException
2022-10-12 10:29:18.288 INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:29:29.249 INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time
java.util.concurrent.TimeoutException: null
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]
2022-10-12 10:30:19.322 INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:30:30.261 INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time
java.util.concurrent.TimeoutException: null
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]
2022-10-12 10:31:20.327 INFO 1525759 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='93694400000000019830554236', timestamp=null, stream='arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355', shard='shardId-00000001665547915463-c0fb26f8', reset=false}, state=NEW}] has been started.
2022-10-12 10:31:31.274 INFO 1525759 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'persist_to_jobs_history_dynamodb:arn:aws:dynamodb:eu-west-1:012345678901:table/staging_audit_log/stream/2022-09-22T11:49:59.355:shardId-00000001665547915463-c0fb26f8' was not renewed in time
java.util.concurrent.TimeoutException: null
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) ~[na:1.8.0_342]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[na:1.8.0_342]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) [spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) [spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) [spring-integration-aws-2.5.1.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) [na:1.8.0_342]
- As a consequence there is an extra record in
SpringIntegrationLockRegistry
DynamoDB table for the closed shard with the second instance as an owner. This record is being read/updated by the second instance repeatedly which increases read/write usage. The number of these extra records and thus read/write usage increases over the time.
Applications use spring-cloud-stream
version 3.2.4
and spring-cloud-stream-binder-kinesis
version 2.2.0
.
Is there anything that can be done to prevent this behavior?
Thanks!
Thank you for great investigation!
I think we need to double check the shard state when its lock in our hands, so we just stop consuming it and drop the lock if shard is closed.
Will look into this next week.
Hi @artembilan,
is there any progress with this issue? Did you have a chance to look at it?
Thank you!
Sorry, I did not have a chance to look into this: busy preparing Spring Integration 6.0
release.
Just pushed some fix.
@tonkerotti ,
would you mind retesting your solution against the latest 2.5.3-SNAPSHOT
version?
spring-integration-aws-2.5.3
has been released: https://github.com/spring-projects/spring-integration-aws/releases/tag/v2.5.3