ClassCastException in pipeline: Kafka Streams Function -> Kafka Function -> Kafka Streams Consumer
Closed this issue · 5 comments
Hi everybody,
I have a spring-cloud-stream
application. Spring Boot is 2.6.2 and Spring Cloud is 2021.0.0.
Working with Kafka and Kafka Streams.
The app has the following data pipeline:
@Bean
public Supplier<Message<ClassA>>supplier(){
return()->MessageBuilder.createMessage(
new ClassA(random.ints(0,10).findFirst().getAsInt()),
new MessageHeaders(
Map.of(KafkaHeaders.MESSAGE_KEY,String.valueOf(random.ints(0,10).findFirst().getAsInt())))
);
}
@Bean
public Function<KStream<String, ClassA>,KStream<String, ClassB>>streamFunction(){
return stream->stream.mapValues(classA->new ClassB(String.valueOf(classA.a)));
}
@Bean
public Function<ClassB, Message<ClassC>>function(){
return classB->MessageBuilder.createMessage(
new ClassC(Integer.parseInt(classB.b)),
new MessageHeaders(Map.of(KafkaHeaders.MESSAGE_KEY,classB.b))
);
}
There are 3 consumers that expect ClassC
objects:
@Bean
public Consumer<KStream<String, ClassC>>consumer1(){
return stream->stream.peek((k,v)->log.info("No class cast: {}, {}",k,v));
}
@Bean
public Consumer<KStream<String, String>>consumer2(){
return stream->stream.peek((k,v)->log.info("No class cast - consumer2: {}, {}",k,v));
}
@Bean
public Consumer<ClassC> consumer3(){
return classC->log.info("No class cast - consumer3: {}",classC);
}
Consumers 2 and 3 work fine:
2022-02-18 11:57:04.782 INFO 19855 --- [-StreamThread-1] s.k.c.ScsKafkaClassCastApplication : No class cast - consumer2: 9, {"c":9}
2022-02-18 11:57:04.783 INFO 19855 --- [container-0-C-1] s.k.c.ScsKafkaClassCastApplication : No class cast - consumer3: ScsKafkaClassCastApplication.ClassC(c=9)
2022-02-18 11:57:05.499 INFO 19855 --- [-StreamThread-1] s.k.c.ScsKafkaClassCastApplication : No class cast - consumer2: 3, {"c":3}
2022-02-18 11:57:05.500 INFO 19855 --- [container-0-C-1] s.k.c.ScsKafkaClassCastApplication : No class cast - consumer3: ScsKafkaClassCastApplication.ClassC(c=3)
Where Consumer 1 gets following ClassCastException
:
2022-02-18 12:25:08.257 ERROR 22910 --- [-StreamThread-1] o.a.k.s.processor.internals.TaskManager : stream-thread [consumer1-applicationId-72da01fb-de90-4133-bcc4-d63e09f3f8eb-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:769) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1193) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) ~[kafka-streams-3.0.0.jar:na]
Caused by: java.lang.ClassCastException: class scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB cannot be cast to class scs.kafka.classcast.ScsKafkaClassCastApplication$ClassC (scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB and scs.kafka.classcast.ScsKafkaClassCastApplication$ClassC are in unnamed module of loader 'app')
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) ~[kafka-streams-3.0.0.jar:na]
... 11 common frames omitted
2022-02-18 12:25:08.257 ERROR 22910 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [consumer1-applicationId-72da01fb-de90-4133-bcc4-d63e09f3f8eb] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:769) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1193) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) ~[kafka-streams-3.0.0.jar:na]
Caused by: java.lang.ClassCastException: class scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB cannot be cast to class scs.kafka.classcast.ScsKafkaClassCastApplication$ClassC (scs.kafka.classcast.ScsKafkaClassCastApplication$ClassB and scs.kafka.classcast.ScsKafkaClassCastApplication$ClassC are in unnamed module of loader 'app')
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41) ~[kafka-streams-3.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) ~[kafka-streams-3.0.0.jar:na]
... 11 common frames omitted
Seems like this behavior isn't correct.
Probably it could be related to the fact that there is one regular Kafka function between 2 Kafka Streams functions.
You can find sample application here: https://github.com/aliaksandr-pleski/spring-cloud-stream-kafka-class-cast-exception
@aliaksandr-pleski Thanks for this report. After looking at this issue a bit further, I think we can resolve it in a couple of ways.
The reason you are getting this error is due to the fact that __typeid__
header is not set properly by your function()
. This is supposed to be set by the JsonSerializer
in Spring Kafka. Your streamFunction
function is a Kafka Streams function and therefore by default it uses the JsonSerializer
and adds the __typdid__
header as ClassB
on the outbound. This is then passed down to function
's input, but function
is based on regular Kafka binder and it uses message conversion behind the scenes (on both input and output) and does not use JsonSerializer
(or native Kafka serialization). When function
publishes the data (as type ClassC
), it doesn't properly update the type id header (since no JsonSerializer
is involved). It simply passes down the same __typeid__
value it received on the input (which is ClassB
) and when it reaches the next function (consumer1
), you run into that ClassCastException
you are getting.
You can get around to this problem in two ways. The first one is pretty trivial while the second one needs some configuration changes.
- you can add the
__typeid__
header programmatically when sending the messages from thefunction
method. Below is an example.
@bean
public Function<ClassB, Message<ClassC>> function() {
return classB -> {
final Map<String, Object> headers = new HashMap<>();
headers.put(KafkaHeaders.MESSAGE_KEY, classB.b);
headers.put(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, ClassC.class.getCanonicalName());
return MessageBuilder.createMessage(
new ClassC(Integer.parseInt(classB.b)),
new MessageHeaders(headers)
);
};
}
Note the explicit addition of DEFAULT_CLASSID_FIELD_NAME
header.
- The other option is to force the
function
method to use native encoding on the outbound so that it uses Kafka's serialization using theJsonSerializer
from Spring Kafka. This way, the propertypeid
header is added when the record is published. Here are the relevant configuration changes needed.
spring:
cloud:
function:
definition: supplier;streamFunction;function;consumer1
stream:
kafka:
bindings:
function-out-0:
producer:
configuration:
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
....
bindings:
...
function-in-0:
binder: kafka
destination: scs-cc-streamFunction
function-out-0:
binder: kafka
destination: scs-cc-consumer
producer:
useNativeEncoding: true
....
....
I verified that both solutions work. Please let us know if they work on your end with these solutions.
@sobychacko thanks for the quick response!
Both options are working for me. I personally find option 2 more preferable.
The fact that the regular Kafka function
doesn't properly update __typeid__
header and just proxies existing one - is that the expected behavior?
Since the regular binder uses message conversion by default from Spring, it doesn't go through the native Kafka serializer, and that code for specifically adding that header is part of the JsonSerializer
class. Therefore, this is the expected behavior.
Closing the issue now, feel free to re-open if you find something else related to this.
Got it, thanks!