spring-projects/spring-integration-aws

KinesisMessageDrivenChannelAdapter can't recover after system sleep

RomanAbakumov opened this issue · 36 comments

In what version(s) of Spring Integration AWS are you seeing this issue?
3.0.0-M2

Describe the bug
This issue originally was described in this issue: spring-cloud/spring-cloud-stream-binder-aws-kinesis#190

I have a localstack running locally and 3 dummy app instances that send and receive messages. My system gets into sleep mode, and after woke up from it one of app instances started to report an exception:

2023-05-01T14:54:10.459-04:00 INFO 16112 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: AKSSPO8S0D2O3C4WWIF4QKMEL8VJSL2NANQN52N899PIFJFO7I09) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981598798030864787941543021567332992185863766098', timestamp=null, stream='stream-v0', shard='shardId-000000000005', reset=false}, state=CONSUME}] task invocation.

To Reproduce

  1. Clone the repository https://github.com/RomanAbakumov/kinesis-lock-issues-demo
  2. run localstack using startLocalstack.cmd/startLocalstack.sh
  3. start 3 application instances using mvnw spring-boot:run
  4. wait for some time (5 min) so applications allocate locks and start to consume messages
  5. Put your system to sleep for some time, mine was in sleep mode for more than 30 minutes

Expected behavior

No such errors in the logs, locks should be reallocated if the shard iterator is expired.
After I shut down all app instances and start it again I see old messages have been consumed, so I suppose that iterator was expired, but locks were allocated so messages from this shard were not consumed until the problematic instance dies.

I observe this issue without getting into sleep mode, app just was running overnight with 3 nodes and one of nodes reporting
2023-05-02T09:22:17.256-04:00 INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: 5INQEG2CUH6ICRPEWMN5JNRM27UIDZVIA2HS3SWPA712IHKEW18A) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.

Well, probably you are blocking in the processor somehow.
I don't see a correlation between "300 seconds" and just having the app running overnight.

I'll definitely look into the problem shortly.

No such errors in the logs, locks should be reallocated if the shard iterator is expired.

That's not correct: an iterator is just a pointer in a shard, but lock is really set on a shard.
So, in case of ExpiredIteratorException we just need to obtain a fresh iterator - nothing to do with the lock we hold on a shard.
I may imagine a process taking too long for records handling, longer than those 5 mins.
Therefore the logic we have so far in the getRecords() is correct:

	try {
				return KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getRecords(getRecordsRequest).join();
			}
			catch (ExpiredIteratorException e) {
				// Iterator expired, but this does not mean that shard no longer contains
				// records.
				// Let's acquire iterator again (using checkpointer for iterator start
				// sequence number).
				logger.info(() ->
						"Shard iterator for ["
								+ ShardConsumer.this
								+ "] expired.\n"
								+ "A new one will be started from the check pointed sequence number.");
				this.state = ConsumerState.EXPIRED;
			}

Only the problem that we really got a java.util.concurrent.CompletionException and that ExpiredIteratorException is a cause 🤷

Once application get this error it can't recover, in the same time other 2 application that is running on the same machine run fine without this issue.

2023-05-02T16:28:50.870-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: IYZL5F2TYOR7H0FPNJQZLRNEHXISK677K2TMFODCWWRMY85VAZ9O) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.
Received message: Message: 6693
2023-05-02T16:28:57.348-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: 3ZZ2K9JGVQEB0W4IK42L2TJFS4OSXTBYG24AV7BC8W5TC58C7JIV) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.
Message sent: Message: 6692
2023-05-02T16:29:02.340-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: LL0KNV3HH8F6Z0BD5CV5RKXX3GM4JEJ34MOQUY5QHWSF8LPS5UPE) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.
2023-05-02T16:29:07.028-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: EFDZR9IC1FPX9J42809L6PUT1XQKPFEQCAHQ7UE4GPCX3NC6U4NZ) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.
Message sent: Message: 6693
Received message: Message: 6694
Received message: Message: 6695
2023-05-02T16:29:11.678-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: 2O3V67HO8Z8KLLQJW5ABJLPCIURO4ZZHD8MH8IU1H5SM58QXCBII) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.
Received message: Message: 6693
Message sent: Message: 6694
2023-05-02T16:29:18.332-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: UL0AQ1CKDRGW6B8NN4WRLNZQ46OJBKFYV603S9SPAXWAF2511ZPM) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.
Received message: Message: 6696
2023-05-02T16:29:23.129-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: 5DFVGCT9ZJAL14WHN45QJQ2I5OULP6NX3ZVA5S1BNCGLAYF85LSW) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.
Message sent: Message: 6695
2023-05-02T16:29:29.176-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: N6KX44K8EVXVA6NKBTAIS9SD24KO833JIPT9DC83E32FH2N621SP) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.
2023-05-02T16:29:32.451-04:00  INFO 13008 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: RV7ZN6GB3K45L667459FKEDQ4NHJNZROPPUSAUO7G4FT6RHROHF8) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981531895795269196072558463410856052021193080866', timestamp=null, stream='stream-v0', shard='shardId-000000000002', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.

As I can see this issue is applied only to shard 2, the app is still receiving messages from other shards it allocated.

I did app restart and I believe a new snapshot was used, I start getting an errors like:

2023-05-02T17:10:40.963-04:00 ERROR 30752 --- [c-response-1-48] o.s.i.a.outbound.KinesisMessageHandler   : Failed to send async reply: GenericMessage [payload=byte[10], headers={errorChannel=org.springframework.integration.channel.PublishSubscribeChannel@1526f573, aws_shard=shardId-000000000005, aws_sequenceNumber=49640352981598798030864787943384215590612864720741335122, aws_serviceResult=PutRecordResponse(ShardId=shardId-000000000005, SequenceNumber=49640352981598798030864787943384215590612864720741335122, EncryptionType=NONE), id=07c779df-0ff6-1574-f4fa-da8dbb9e909a, contentType=application/json, target-protocol=kafka, timestamp=1683061840963}]

org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
	at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:479) ~[spring-integration-core-6.0.3.jar:6.0.3]
	at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.accept(AbstractMessageProducingHandler.java:581) ~[spring-integration-core-6.0.3.jar:6.0.3]
	at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.accept(AbstractMessageProducingHandler.java:563) ~[spring-integration-core-6.0.3.jar:6.0.3]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56) ~[sdk-core-2.20.30.jar:na]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69) ~[sdk-core-2.20.30.jar:na]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177) ~[sdk-core-2.20.30.jar:na]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105) ~[sdk-core-2.20.30.jar:na]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163) ~[sdk-core-2.20.30.jar:na]
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-05-02T17:10:40.967-04:00 ERROR 30752 --- [c-response-1-48] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
	at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:479)
	at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.accept(AbstractMessageProducingHandler.java:581)
	at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.accept(AbstractMessageProducingHandler.java:563)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

We have discussed this error with you before: a 4.0.0-SNAPSHOT for Kinesis Binder must be used: spring-cloud/spring-cloud-stream-binder-aws-kinesis#190 (comment)

I use 4.0.0-SNAPSHOT version, yesterday I didn't see this error, but today it came back.

Try to use -U for Maven or --refresh-dependencies for Gradle.

I just did it "mvnw -U clean spring-boot:run", and it is still erroring
Yesterday it worked fine with the same dependencies, so I suppose it is caused by the new snapshot version.

No, there is no changes in the Kinesis binder since the latest SNAPSHOT, plus what I have just done here in SI-AWS is no related to the KinesisMessageHandler.
Please, double check if you really use spring-cloud-stream-binder-kinesis-4.0.0-SNAPSHOT.
See mvn dependency:tree report.

I didn't know why yesterday it worked fine and now error back again, here is the dependency tree related to kinesis.

[INFO] \- org.springframework.cloud:spring-cloud-stream-binder-kinesis:jar:4.0.0-SNAPSHOT:compile
[INFO]    +- software.amazon.kinesis:amazon-kinesis-client:jar:2.4.8:compile
[INFO]    |  +- software.amazon.awssdk:kinesis:jar:2.20.8:compile
[INFO]    |  |  +- software.amazon.awssdk:aws-cbor-protocol:jar:2.20.8:compile
[INFO]    |  |  |  \- software.amazon.awssdk:third-party-jackson-dataformat-cbor:jar:2.20.8:compile
[INFO]    |  |  +- software.amazon.awssdk:aws-json-protocol:jar:2.20.8:compile
[INFO]    |  |  +- software.amazon.awssdk:protocol-core:jar:2.20.8:compile
[INFO]    |  |  +- software.amazon.awssdk:aws-core:jar:2.20.8:compile
[INFO]    |  |  +- software.amazon.awssdk:metrics-spi:jar:2.20.8:compile
[INFO]    |  |  +- software.amazon.awssdk:endpoints-spi:jar:2.20.8:compile
[INFO]    |  |  \- software.amazon.awssdk:apache-client:jar:2.20.8:runtime
[INFO]    |  |     \- org.apache.httpcomponents:httpcore:jar:4.4.16:compile
[INFO]    |  +- software.amazon.awssdk:dynamodb:jar:2.20.8:compile
[INFO]    |  +- software.amazon.awssdk:cloudwatch:jar:2.20.8:compile
[INFO]    |  |  \- software.amazon.awssdk:aws-query-protocol:jar:2.20.8:compile
[INFO]    |  +- software.amazon.awssdk:netty-nio-client:jar:2.20.8:compile
[INFO]    |  |  +- io.netty:netty-codec-http:jar:4.1.89.Final:compile
[INFO]    |  |  +- io.netty:netty-codec-http2:jar:4.1.89.Final:compile
[INFO]    |  |  +- io.netty:netty-codec:jar:4.1.89.Final:compile
[INFO]    |  |  +- io.netty:netty-transport:jar:4.1.89.Final:compile
[INFO]    |  |  |  \- io.netty:netty-resolver:jar:4.1.89.Final:compile
[INFO]    |  |  +- io.netty:netty-common:jar:4.1.89.Final:compile
[INFO]    |  |  +- io.netty:netty-buffer:jar:4.1.89.Final:compile
[INFO]    |  |  +- io.netty:netty-handler:jar:4.1.89.Final:compile
[INFO]    |  |  |  \- io.netty:netty-transport-native-unix-common:jar:4.1.89.Final:compile
[INFO]    |  |  \- io.netty:netty-transport-classes-epoll:jar:4.1.89.Final:compile
[INFO]    |  +- software.amazon.glue:schema-registry-serde:jar:1.1.14:compile
[INFO]    |  |  +- com.amazonaws:aws-java-sdk-sts:jar:1.12.151:compile
[INFO]    |  |  |  \- com.amazonaws:jmespath-java:jar:1.12.151:compile
[INFO]    |  |  +- software.amazon.awssdk:sts:jar:2.17.122:compile
[INFO]    |  |  +- software.amazon.awssdk:arns:jar:2.17.122:compile
[INFO]    |  |  +- org.apache.kafka:kafka-clients:jar:3.3.2:compile
[INFO]    |  |  |  +- com.github.luben:zstd-jni:jar:1.5.2-1:runtime
[INFO]    |  |  |  +- org.lz4:lz4-java:jar:1.8.0:runtime
[INFO]    |  |  |  \- org.xerial.snappy:snappy-java:jar:1.1.8.4:runtime
[INFO]    |  |  +- com.kjetland:mbknor-jackson-jsonschema_2.12:jar:1.0.39:compile
[INFO]    |  |  |  +- org.scala-lang:scala-library:jar:2.12.10:compile
[INFO]    |  |  |  \- javax.validation:validation-api:jar:2.0.1.Final:compile
[INFO]    |  |  +- io.github.classgraph:classgraph:jar:4.8.120:compile
[INFO]    |  |  +- com.github.erosb:everit-json-schema:jar:1.14.1:compile
[INFO]    |  |  |  +- org.json:json:jar:20220320:compile
[INFO]    |  |  |  +- commons-validator:commons-validator:jar:1.7:compile
[INFO]    |  |  |  |  +- commons-digester:commons-digester:jar:2.1:compile
[INFO]    |  |  |  |  \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO]    |  |  |  +- com.damnhandy:handy-uri-templates:jar:2.1.8:compile
[INFO]    |  |  |  \- com.google.re2j:re2j:jar:1.6:compile
[INFO]    |  |  +- org.jetbrains.kotlin:kotlin-reflect:jar:1.7.22:compile
[INFO]    |  |  +- org.jetbrains.kotlin:kotlin-scripting-compiler-impl-embeddable:jar:1.7.10:compile
[INFO]    |  |  |  +- org.jetbrains.kotlin:kotlin-scripting-common:jar:1.7.22:runtime
[INFO]    |  |  |  \- org.jetbrains.kotlin:kotlin-scripting-jvm:jar:1.7.22:runtime
[INFO]    |  |  |     \- org.jetbrains.kotlin:kotlin-script-runtime:jar:1.7.22:runtime
[INFO]    |  |  +- org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable:jar:1.7.10:compile
[INFO]    |  |  +- org.jetbrains.kotlinx:kotlinx-serialization-core-jvm:jar:1.4.0:compile
[INFO]    |  |  +- com.squareup.wire:wire-schema:jar:3.7.1:compile
[INFO]    |  |  |  +- com.squareup.wire:wire-runtime:jar:3.7.1:compile
[INFO]    |  |  |  \- com.squareup.okio:okio:jar:2.8.0:compile
[INFO]    |  |  +- com.squareup.wire:wire-compiler:jar:3.7.1:compile
[INFO]    |  |  |  +- com.squareup.wire:wire-kotlin-generator:jar:3.7.1:compile
[INFO]    |  |  |  |  +- com.squareup:kotlinpoet:jar:1.7.2:compile
[INFO]    |  |  |  |  \- com.squareup.wire:wire-grpc-server-generator:jar:3.7.1:runtime
[INFO]    |  |  |  +- com.squareup.wire:wire-java-generator:jar:3.7.1:runtime
[INFO]    |  |  |  |  \- com.squareup:javapoet:jar:1.13.0:runtime
[INFO]    |  |  |  +- com.squareup.wire:wire-swift-generator:jar:3.7.1:runtime
[INFO]    |  |  |  |  \- io.outfoxx:swiftpoet:jar:1.0.0:runtime
[INFO]    |  |  |  +- com.squareup.wire:wire-profiles:jar:3.7.1:runtime
[INFO]    |  |  |  \- com.charleskorn.kaml:kaml:jar:0.20.0:runtime
[INFO]    |  |  |     \- org.snakeyaml:snakeyaml-engine:jar:2.1:runtime
[INFO]    |  |  +- com.google.api.grpc:proto-google-common-protos:jar:2.7.4:compile
[INFO]    |  |  \- com.google.jimfs:jimfs:jar:1.1:compile
[INFO]    |  +- software.amazon.glue:schema-registry-common:jar:1.1.14:compile
[INFO]    |  |  +- software.amazon.awssdk:glue:jar:2.17.122:compile
[INFO]    |  |  +- software.amazon.glue:schema-registry-build-tools:jar:1.1.14:compile
[INFO]    |  |  +- software.amazon.awssdk:url-connection-client:jar:2.17.122:compile
[INFO]    |  |  +- org.apache.avro:avro:jar:1.11.0:compile
[INFO]    |  |  +- org.apache.commons:commons-compress:jar:1.21:compile
[INFO]    |  |  +- org.projectlombok:lombok:jar:1.18.26:compile
[INFO]    |  |  \- org.projectlombok:lombok-utils:jar:1.18.12:compile
[INFO]    |  +- com.google.guava:guava:jar:31.1-jre:compile
[INFO]    |  |  +- com.google.guava:failureaccess:jar:1.0.1:compile
[INFO]    |  |  +- com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava:compile
[INFO]    |  |  +- com.google.code.findbugs:jsr305:jar:3.0.2:compile
[INFO]    |  |  +- org.checkerframework:checker-qual:jar:3.12.0:compile
[INFO]    |  |  +- com.google.errorprone:error_prone_annotations:jar:2.11.0:compile
[INFO]    |  |  \- com.google.j2objc:j2objc-annotations:jar:1.3:compile
[INFO]    |  +- com.google.protobuf:protobuf-java:jar:3.21.12:compile
[INFO]    |  +- org.apache.commons:commons-lang3:jar:3.12.0:compile
[INFO]    |  \- io.reactivex.rxjava3:rxjava:jar:3.1.6:compile
[INFO]    +- com.amazonaws:amazon-kinesis-producer:jar:0.15.5:compile
[INFO]    |  +- commons-io:commons-io:jar:2.11.0:compile
[INFO]    |  +- commons-lang:commons-lang:jar:2.6:compile
[INFO]    |  +- com.amazonaws:aws-java-sdk-core:jar:1.12.382:compile
[INFO]    |  |  +- commons-logging:commons-logging:jar:1.1.3:compile
[INFO]    |  |  +- commons-codec:commons-codec:jar:1.15:compile
[INFO]    |  |  +- org.apache.httpcomponents:httpclient:jar:4.5.14:compile
[INFO]    |  |  +- software.amazon.ion:ion-java:jar:1.0.2:compile
[INFO]    |  |  +- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.14.2:compile
[INFO]    |  |  \- joda-time:joda-time:jar:2.8.1:compile
[INFO]    |  +- javax.xml.bind:jaxb-api:jar:2.3.1:compile
[INFO]    |  +- com.sun.xml.bind:jaxb-core:jar:4.0.2:compile
[INFO]    |  |  +- jakarta.xml.bind:jakarta.xml.bind-api:jar:4.0.0:compile
[INFO]    |  |  |  \- jakarta.activation:jakarta.activation-api:jar:2.1.1:compile
[INFO]    |  |  \- org.eclipse.angus:angus-activation:jar:2.0.0:runtime
[INFO]    |  \- com.sun.xml.bind:jaxb-impl:jar:4.0.2:compile
[INFO]    \- io.awspring.cloud:spring-cloud-aws-starter:jar:3.0.0-RC2:compile
[INFO]       \- io.awspring.cloud:spring-cloud-aws-autoconfigure:jar:3.0.0-RC2:compile
[INFO] ------------------------------------------------------------------------

And I see a new version of binder in my local maven repository, looks like it was downloaded today
spring-cloud-stream-binder-kinesis-4.0.0-20230427.170221-506.jar - old
spring-cloud-stream-binder-kinesis-4.0.0-20230502.142417-508.jar - new

Well, there is nothing new in this project committed for a while. The SNAPSHOT probably is rebuilt by our Jenkins nightly plan.
Be sure to use the latest Spring Boot and latest Spring Cloud.
And only snapshots for Kinesis Binder and Spring Integration AWS: 4.0.0-SNAPSHOT and 3.0.0-SNAPSHOT, respectively.

if I use version 4.0.0-20230427.170221-506 it works fine, with the latest snapshot it doesn't, I didn't know why.
You may try to reproduce using the referenced project.

And I compared the dependency tree, it is the same.

Very suspicious… will debug that tomorrow . Probably really some fresh Spring Cloud from yesterday’s release did something inferring us… 🤔

I've just build a fresh SNAPSHOT for Kinesis Binder.
Please, see if that fixes your problem.

Yes, it is not appearing anymore, thanks.

@artembilan
The issue with the expired iterator was reproduced again using the latest build. I've left 4 demo apps running for some time and then put my system to sleep overnight. 3 apps were able to recover and continue sending and receiving messages, but the one is sending, but not able to receive messages from one shard, and has an error in the log, see below.

The demo app that is referenced uses a stream with 16 shards, and maybe the issue is related to the way how it renews/updates shards, I saw some places use a single loop to iterate over shards, and if renewal takes too long some of the objects may expire.
Please note that it is easy to reproduce the issue using referenced demo project, all that is needed is running localstack.

Steps to reproduce:

  1. start a single app instance from the repo referenced in the first message using the command mvnw spring-boot:run
  2. let it run for 5 minutes to be sure it allocates all the shard locks
  3. start 4 more app instances
  4. stop/kill the app started on the first step, kill -TERM is preferred
  5. wait for 4 apps that is started on step 3 to start consuming messages
  6. put your system to sleep for some time (I suggest 30 minutes, but I'm not sure)
    In some cases, one of the nodes is starting to report errors about ExpiredIteratorException every 10 seconds.

2023-05-04T08:43:35.059-04:00 INFO 30560 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: 4GXFER4GY2XW3WLWHRR6Z1IRKVSJAP8P8JGE5TJOHZ0K0T7L586P) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981688001011658910436253798844431519424828145810', timestamp=null, stream='stream-v0', shard='shardId-000000000009', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.

Any chances that you can turn on DEBUG logging level for your app and show more info around that error, please?
What you show so far is not possible with the current code.
That state=CONSUME confirms that we are in the processTask() where we perform an AWS call only from the result = getRecords(getRecordsRequest); which has this logic:

	try {
				return KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getRecords(getRecordsRequest).join();
			}
			catch (CompletionException ex) {
				Throwable cause = ex.getCause();
				if (cause instanceof ExpiredIteratorException) {

So, this sounds more like your back to old SNAPSHOTs for some reason.

I cannot run Localstack locally, but I do use it via Docker and Testcontainers.

See my solution here: https://github.com/spring-projects/spring-integration-aws/blob/main/src/test/java/org/springframework/integration/aws/LocalstackContainerTest.java

I will try to reproduce it with debug logging.
BTW, all that you need to run the localstack using the command "docker run --rm -it -p 4566:4566 -p 4510-4559:4510-4559 localstack/localstack" It is in startLocalstack.cmd/startLocalstack.sh
The application I referenced specifically made to reproduce the problem I'm facing and doesn't have any dependencies and business logic.

@artembilan Is there any specific package you want debug logging to be enabled?
Enabling it on global level produces too much output, netty log a lot of info about every request it sends.

This one should be enough for us:

logging:
  level:
    org.springframework.integration.aws: trace

I see that you still don't use there the latest SI-AWS SNAPSHOT:

		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-aws</artifactId>
			<version>3.0.0-M2</version>
		</dependency>

Thanks for the command! I was able to start that container in my Docker.
Now I need to be sure that the code you share with me in that repo is really identical to what you have locally.
I mean that with spring-integration-aws-3.0.0-M2 the problem will be there since the fix for this issue is in SNAPSHOT only yet.

That's good, what should be the correct version then?
3.0.0-SNAPSHOT ?

Yes. That is exactly what we have fixed here with you.
Please, use:

		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-aws</artifactId>
			<version>3.0.0-SNAPSHOT</version>
		</dependency>

Yeah, that makes sense, let me try if it is reproduced with it.

Looks like it works as it should I'll continue watching it, but works fine so far.

I have the demo application running for the weekend, and today discovered the another error:
2023-05-08T10:10:25.672-04:00 INFO 10392 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.lang.NullPointerException: Cannot invoke "String.equals(Object)" because "highestSequence" is null during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640452625431682182218618986916024639767881501162602706', timestamp=null, stream='stream-v0', shard='shardId-000000000013', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.

Just to clarify, I'm running 4 applications from the referenced repository, it was running fine since Friday, and I discovered the mentioned error in the logs for all 4 running apps for a different shards.

It fails here:

			String highestSequence = this.checkpointer.getHighestSequence();
			if (highestSequence.equals(lastCheckpoint)) {
				logger.info(ex, "Record processor has thrown exception. " +
						"Ignore since the highest sequence in batch was check-pointed.");

Where we go there like this:

				try {
					result = getRecords(getRecordsRequest);
					if (result != null) {
						List<Record> records = result.records();

						if (!records.isEmpty()) {
							processRecords(records);
						}
						this.shardIterator = result.nextShardIterator();
					}
				}
				catch (Exception ex) {
					rewindIteratorOnError(ex, result);
				}

So, when we got an exception in the getRecords(), there might not be a highestSequence set yet: it is done in that processRecords() and as you see only if (!records.isEmpty()) {.

Will fix soon...

Fixed like this: d7a2dd5.

@RomanAbakumov ,

thank you for testing this and your feedback!

@artembilan thanks for addressing it quickly!
Please let me know when you will make a snapshot version so I may test it.

It is there already: https://build.spring.io/browse/INTEXT-AWS-410/.

We build a fresh SNAPSHOT on each commit to main branch.

I did rerun the application, and I see the error below again, looks like a new kinesis binder snapshot is published.
Nothing was changed on my side just another run of same app.

2023-05-10T14:19:24.479-04:00 ERROR 13924 --- [c-response-1-10] o.s.integration.handler.LoggingHandler : org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

OK. Thank you for reporting this, @RomanAbakumov !

I think something is wrong with our CI.
Probably there is some conflicting plan which builds from a wrong branch.
Will fix it soon...

Have just build a fresh SNAPSHOT from correct main branch: https://jenkins.spring.io/job/spring-cloud-stream-binder-aws-kinesis-main-ci/285/console.
And removed 4.0-WIP CI plans 🤷