spring-attic/spring-cloud-stream-binder-kafka

ClassCastException in pipeline: Kafka Streams Function -> Kafka Function -> Kafka Streams Consumer

Closed this issue · 5 comments

Hi everybody,

I have a spring-cloud-stream application. Spring Boot is 2.6.2 and Spring Cloud is 2021.0.0.
Working with Kafka and Kafka Streams.

The app has the following data pipeline:

@Bean
public Supplier<Message<ClassA>>supplier(){
    return()->MessageBuilder.createMessage(
    new ClassA(random.ints(0,10).findFirst().getAsInt()),
    new MessageHeaders(
    Map.of(KafkaHeaders.MESSAGE_KEY,String.valueOf(random.ints(0,10).findFirst().getAsInt())))
    );
}

@Bean
public Function<KStream<String, ClassA>,KStream<String, ClassB>>streamFunction(){
    return stream->stream.mapValues(classA->new ClassB(String.valueOf(classA.a)));
}

@Bean
public Function<ClassB, Message<ClassC>>function(){
    return classB->MessageBuilder.createMessage(
    new ClassC(Integer.parseInt(classB.b)),
    new MessageHeaders(Map.of(KafkaHeaders.MESSAGE_KEY,classB.b))
    );
}

There are 3 consumers that expect ClassC objects:

@Bean
public Consumer<KStream<String, ClassC>>consumer1(){
    return stream->stream.peek((k,v)->log.info("No class cast: {}, {}",k,v));
}

@Bean
public Consumer<KStream<String, String>>consumer2(){
    return stream->stream.peek((k,v)->log.info("No class cast - consumer2: {}, {}",k,v));
}

@Bean
public Consumer<ClassC> consumer3(){
    return classC->log.info("No class cast - consumer3: {}",classC);
}

Consumers 2 and 3 work fine:

2022-02-18 11:57:04.782  INFO 19855 --- [-StreamThread-1] s.k.c.ScsKafkaClassCastApplication       : No class cast - consumer2: 9, {"c":9}
2022-02-18 11:57:04.783  INFO 19855 --- [container-0-C-1] s.k.c.ScsKafkaClassCastApplication       : No class cast - consumer3: ScsKafkaClassCastApplication.ClassC(c=9)
2022-02-18 11:57:05.499  INFO 19855 --- [-StreamThread-1] s.k.c.ScsKafkaClassCastApplication       : No class cast - consumer2: 3, {"c":3}
2022-02-18 11:57:05.500  INFO 19855 --- [container-0-C-1] s.k.c.ScsKafkaClassCastApplication       : No class cast - consumer3: ScsKafkaClassCastApplication.ClassC(c=3)

Where Consumer 1 gets following ClassCastException:

2022-02-18 12:25:08.257 ERROR 22910 --- [-StreamThread-1] o.a.k.s.processor.internals.TaskManager  : stream-thread [consumer1-applicationId-72da01fb-de90-4133-bcc4-d63e09f3f8eb-StreamThread-1] Failed to process stream task 0_0 due to the following error:

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:769) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1193) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) ~[kafka-streams-3.0.0.jar:na]
Caused by: java.lang.ClassCastException: class scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB cannot be cast to class scs.kafka.classcast.ScsKafkaClassCastApplication$ClassC (scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB and scs.kafka.classcast.ScsKafkaClassCastApplication$ClassC are in unnamed module of loader 'app')
	at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) ~[kafka-streams-3.0.0.jar:na]
	... 11 common frames omitted

2022-02-18 12:25:08.257 ERROR 22910 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [consumer1-applicationId-72da01fb-de90-4133-bcc4-d63e09f3f8eb] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:769) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1193) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) ~[kafka-streams-3.0.0.jar:na]
Caused by: java.lang.ClassCastException: class scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB cannot be cast to class scs.kafka.classcast.ScsKafkaClassCastApplication$ClassC (scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB and scs.kafka.classcast.ScsKafkaClassCastApplication$ClassC are in unnamed module of loader 'app')
	at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41) ~[kafka-streams-3.0.0.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) ~[kafka-streams-3.0.0.jar:na]
	... 11 common frames omitted

Seems like this behavior isn't correct.
Probably it could be related to the fact that there is one regular Kafka function between 2 Kafka Streams functions.

You can find sample application here: https://github.com/aliaksandr-pleski/spring-cloud-stream-kafka-class-cast-exception

@aliaksandr-pleski Thanks for this report. After looking at this issue a bit further, I think we can resolve it in a couple of ways.

The reason you are getting this error is due to the fact that __typeid__ header is not set properly by your function(). This is supposed to be set by the JsonSerializer in Spring Kafka. Your streamFunction function is a Kafka Streams function and therefore by default it uses the JsonSerializer and adds the __typdid__ header as ClassB on the outbound. This is then passed down to function's input, but function is based on regular Kafka binder and it uses message conversion behind the scenes (on both input and output) and does not use JsonSerializer (or native Kafka serialization). When function publishes the data (as type ClassC), it doesn't properly update the type id header (since no JsonSerializer is involved). It simply passes down the same __typeid__ value it received on the input (which is ClassB) and when it reaches the next function (consumer1), you run into that ClassCastException you are getting.

You can get around to this problem in two ways. The first one is pretty trivial while the second one needs some configuration changes.

  1. you can add the __typeid__ header programmatically when sending the messages from the function method. Below is an example.
@bean
public Function<ClassB, Message<ClassC>> function() {
          return classB -> {
		final Map<String, Object> headers = new HashMap<>();
		headers.put(KafkaHeaders.MESSAGE_KEY, classB.b);
		headers.put(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, ClassC.class.getCanonicalName());
		return MessageBuilder.createMessage(
			new ClassC(Integer.parseInt(classB.b)),
			new MessageHeaders(headers)
		);
	};
}

Note the explicit addition of DEFAULT_CLASSID_FIELD_NAME header.

  1. The other option is to force the function method to use native encoding on the outbound so that it uses Kafka's serialization using the JsonSerializer from Spring Kafka. This way, the proper typeid header is added when the record is published. Here are the relevant configuration changes needed.
spring:
  cloud:
    function:
      definition: supplier;streamFunction;function;consumer1
    stream:
      kafka:
        bindings:
          function-out-0:
            producer:
              configuration:
                value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
        ....
      bindings:
        ...
        function-in-0:
          binder: kafka
          destination: scs-cc-streamFunction
        function-out-0:
          binder: kafka
          destination: scs-cc-consumer
          producer:
            useNativeEncoding: true
        ....
        ....

I verified that both solutions work. Please let us know if they work on your end with these solutions.

@sobychacko thanks for the quick response!

Both options are working for me. I personally find option 2 more preferable.

The fact that the regular Kafka function doesn't properly update __typeid__ header and just proxies existing one - is that the expected behavior?

Since the regular binder uses message conversion by default from Spring, it doesn't go through the native Kafka serializer, and that code for specifically adding that header is part of the JsonSerializer class. Therefore, this is the expected behavior.

Closing the issue now, feel free to re-open if you find something else related to this.

Got it, thanks!