spring-attic/spring-cloud-stream-binder-kafka

KeyValueSerdeResolver uses streamConfigGlobalProperties and ignores extended consumer/producer properties

Closed this issue · 0 comments

Given an application which uses Kafka Streams Binder, any consumer or producer configuration provided at binding level, is not being used by the KeyValueSerdeResolver to configure the Serde

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/9e4a1075d437f2e6aa7dad0a38d8286de94234a4/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java#L383-L394

Example:

spring.cloud.stream.function.definition=function1

spring.cloud.stream.bindings.function1-in-0.destination=topic1-in
spring.cloud.stream.bindings.function1-out-0.destination=topic1-out
spring.cloud.stream.kafka.streams.bindings.function1-in-0.consumer.configuration.spring.json.value.type.method=com.test.MyClass.determineType

When setting a breakpoint here:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/739b49996601a97322b986059f20618da0ccaf0a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java#L518

The JsonDeserializer in the Serde doesn't have a typeResolver or any other customization on binding level

Version of the framework
Spring Boot 2.6.1
Spring Cloud 2021.0.0

Related to #1149