logstash-plugins/logstash-integration-kafka

Update Kafka client and Confluent schema registry client

andsel opened this issue · 1 comments

The Kafka client used in plugin version 10.12.0 is 2.8.1 and should be updated to 3.latest.
Confluent schema registry client should be updated from 6.2.2 to 7.3.latest.

The #130 was created to upgrade the clients, but hit a problem of class not found.
The class not found is com.fasterxml.jackson.databind.json.JsonMapper which is present in jackson-databind starting from v2.10.
This originate from io.confluent:kafka-schema-registry-client library with the following execption:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:468)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:291)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:318)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:303)
  at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(jdk/internal/reflect/NativeConstructorAccessorImpl.java:62)
  at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(jdk/internal/reflect/DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:490)
  at org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:279)
  ...
  at org.jruby.Main.main(org/jruby/Main.java:206)
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/json/JsonMapper
  at io.confluent.kafka.schemaregistry.utils.JacksonMapper.<clinit>(JacksonMapper.java:27)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.<clinit>(RestService.java:156)
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:158)
  at io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory.newClient(SchemaRegistryClientFactory.java:36)
  at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.configureClientProperties(AbstractKafkaSchemaSerDe.java:72)
  at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:66)
  at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:396)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
  at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
  at org.jruby.dist/org.jruby.javasupport.JavaConstructor.newInstanceDirect(JavaConstructor.java:279)
  ...
  at org.jruby.dist/org.jruby.Main.main(Main.java:206)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.json.JsonMapper
  at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
  ... 381 more

However, despite both kafka-client:2.8.1 and kafka-schema-registry-client:6.2.2 depended on jackson-databind:2.10+ the problem never manifested because with version 7.0.4 the schema registry switched the implementation of an internal class, that trigger the problem. Described with more technical details in this comment.
The problem manifest with Logstash up to 8.2.3 and 7.17.x. The latest are immune because updated the jackson-databind dependency from 2.9.10 (which doesn't ship that class) to 2.13.3 (which ships the required class).
The solutions to the problem are:

  • bind the version of the logstash-integration-kafka to be installed only on Logstash >=8.3
  • instead of using client kafka-schema-registry-client 7.3.x pin the last version that works 7.0.3

I am in favor of binding logstash-integration-kafka with Logstash >=8.3 because it is a major update of kafka client and I assume it brings new features, so the version of the plugin can make +1 on major and restrict to >=8.3. Pinning the last working version 7.0.3 means we will face the same issue in the future.