spring-cloud/spring-cloud-stream-binder-aws-kinesis

Exception in thread "-kinesis-consumer-1" java.lang.NoSuchFieldError: logger

tjtaill opened this issue · 6 comments

environment:
java 11
spring boot starter parent 2.4.4
spring cloud stream 2020.0.2
spring cloud stream kinesis binder 2.0.2.RELEASE

spring boot tests are passing I am usign the same profile when I run the app.

The application I run

mvn spring-boot:run -Dspring-boot.run.profiles=local

and it gives this weird exception

Exception in thread "-kinesis-consumer-1" java.lang.NoSuchFieldError: logger
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access$5300(KinesisMessageDrivenChannelAdapter.java:98)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.lambda$execute$0(KinesisMessageDrivenChannelAdapter.java:919)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1283)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

I have tried going back to spring 2.3.X but can't since to many things are incompatible any insights would be appreciated

That's indeed versions incompatibility issue. Try the latest Kinesis Binder, Spring Cloud and Spring Boot: https://spring.io/blog/2021/06/04/spring-integration-aws-2-5-1-and-spring-cloud-stream-kinesis-binder-2-2-0-available

I upgraded as you suggested and that eliminated the above exception now I getting a class cast exception
stack trace is below. It seems like it cannot deserialize/unmarshall the kinesis data that I send not sure if it is a b64 encoding issue the data is correct when I use get-records from that same stream shard and b64 decode it as utf-8 so doubt that is the issue the few samples I can find on the web seem to suggest that the Consumer<Event> should work and all that is done under the hood. I saw in another post that suggest that Maybe it should be Consumer<Message<Event>> tried that didn't seem to help

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@7b4117e4]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class *.Event ([B is in module java.base of loader 'bootstrap'; *.Event is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @430c9f0b)
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.7.jar:5.3.7]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.7.jar:5.3.7]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.7.jar:5.3.7]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.7.jar:5.3.7]
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.0.jar:5.5.0]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access$4200(KinesisMessageDrivenChannelAdapter.java:103) ~[spring-integration-aws-2.5.1.jar:na]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.performSend(KinesisMessageDrivenChannelAdapter.java:1320) ~[spring-integration-aws-2.5.1.jar:na]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processSingleRecord(KinesisMessageDrivenChannelAdapter.java:1228) ~[spring-integration-aws-2.5.1.jar:na]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:1215) ~[spring-integration-aws-2.5.1.jar:na]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.lambda$processTask$7(KinesisMessageDrivenChannelAdapter.java:1070) ~[spring-integration-aws-2.5.1.jar:na]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1412) ~[spring-integration-aws-2.5.1.jar:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.ClassCastException: class [B cannot be cast to class *.Event ([B is in module java.base of loader 'bootstrap'; *.Event is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @430c9f0b)
        at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:854) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
        at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:643) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
        at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:489) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
        at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:77) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
        at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:727) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
        at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:560) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.0.jar:5.5.0]
        ... 20 common frames omitted

Good to hear that version mismatch error went away!

The deserialization issue is fully different story: I can't answer immediately not knowing what is your data and what is a contentType of that one.
Maybe there is some chance to share with us a simple project to play with and reproduce?

Well I try to create a small repro project to to repro the issue. The event is a cloud event pojo, I was using the content type application/json as I saw in the sampe https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kinesis-samples then I tried application/x-java-object as I saw here https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/contenttypemanagement.html#mime-types

that didn't work

When I use Consumer<bytes[]> it doesn't cause a cast exception but doesn't return me anything useful either
I also used Consumer<Messasage<byte[]>> that returned me the sames bytes I tried base64 decoding those bytes there are invalid characters in the bytes

this is how I send the kinesis record it works I can pull it of the shard with an iterator and a get-rocrd api call when I decode it clearly shows the correct json string

@Test
	void sendKinesisRecord() throws Exception {
		var kinesisClient = AmazonKinesisClientBuilder
				.standard()
				.withCredentials(awsCredentialsProviderv1)
				.build();
		var putRecordRequest = new PutRecordRequest();
		putRecordRequest.setStreamName(streamName);
		var id = UUID.randomUUID().toString();
		var event =
				new Event(
						"1.0",
						"foo",
						id,
						"/source",
						"application/json",
						"2021-01-25T21:46:27.150384Z",
						Collections.singletonMap("key", "value"));
		putRecordRequest.setData(ByteBuffer.wrap(objectMapper.writeValueAsBytes(event)));
		putRecordRequest.setPartitionKey(id);
		kinesisClient.putRecord(putRecordRequest);
	}

my consumer just simply tries to log the event

@Bean
  public Consumer<Event> processEvent() {
    return event -> {
      log.info(event.toString());
    };
  }

I will try to make a small project to repro it thanks again for all your help

Ok it is working now it was a weird interaction between lobok and jackson described here

https://stackoverflow.com/questions/39381474/cant-make-jackson-and-lombok-work-together

and

projectlombok/lombok#1563

Cool! Thanks for letting us to know.