spring-projects/spring-integration-aws

KclMessageDrivenChannelAdapter graceful shutdown

amolskh opened this issue · 1 comments

I am using manual checkpointing and batch mode listener

When I try to turn of the spring application I have shutdown hook from where I invoke
KclMessageDrivenChannelAdapter.stop()

But I get below exception in logs

2020-09-02 12:10:33.205 ERROR 24721 --- [TaskExecutor-44] s.i.a.i.k.KclMessageDrivenChannelAdapter : Exception while checkpointing at requested shutdown. Giving up

com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:198) ~[amazon-kinesis-client-1.13.3.jar:na]
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:307) ~[amazon-kinesis-client-1.13.3.jar:na]
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:85) ~[amazon-kinesis-client-1.13.3.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.shutdown(KclMessageDrivenChannelAdapter.java:584) ~[spring-integration-aws-2.3.3.RELEASE.jar:na]
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.shutdown(V1ToV2RecordProcessorAdapter.java:48) [amazon-kinesis-client-1.13.3.jar:na]
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:137) [amazon-kinesis-client-1.13.3.jar:na]
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) [amazon-kinesis-client-1.13.3.jar:na]
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) [amazon-kinesis-client-1.13.3.jar:na]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_201]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201]

What is appropriate way of gracefully shutting down the consumer ?

First of all you don't need to do anything in the shutdown hook for Spring beans: the whole application context is tied to the shutdown with the hook to close itself and stop all the beans respectively. See AbstractApplicationContext.registerShutdownHook().

Plus looking to you stack trace, it fully doesn't give any clues that your hook is involved in the cause somehow.

What I see in that stack trace that there is some race condition in the KCL per se, where they have released a lease in DB before that IRecordProcessor.shutdown() is called.

In other words: everything you show here is out of this project control. You need to consult with AWS about KCL behavior and possible configuration to make it grace.

Thanks for understanding!