logstash-plugins/logstash-integration-kafka

kafka input throws TypeError when record return empty headers

kaisecheng opened this issue · 1 comments

Logstash information:

Please include the following information:

  1. Logstash version (e.g. bin/logstash --version) Logstash 8.6.2
  2. Logstash installation source (e.g. built from source, with a package manager: DEB/RPM, expanded from tar or zip archive, docker)
  3. How is Logstash being run (e.g. as a service/service manager: systemd, upstart, etc. Via command line, docker/kubernetes)
  4. How was the Logstash Plugin installed

JVM (e.g. java -version):

If the affected version of Logstash is 7.9 (or earlier), or if it is NOT using the bundled JDK or using the 'no-jdk' version in 7.10 (or higher), please provide the following information:

  1. JVM version (java -version) OpenJDK Runtime Environment Temurin-11.0.15+10 (build 11.0.15+10)
  2. JVM installation source (e.g. from the Operating System's package manager, from source, etc).
  3. Value of the JAVA_HOME environment variable if set.

OS version (uname -a if on a Unix-like system):

Description of the problem including expected versus actual behavior:

Steps to reproduce:

Please include a minimal but complete recreation of the problem,
including (e.g.) pipeline definition(s), settings, locale, etc. The easier
you make for us to reproduce it, the more likely that somebody will take the
time to look at it.

  1. When Kafka broker return The committing offset data size is not valid. for sometimes, kafka-input consumer gets records with empty headers
  2. Logstash throws exception

Provide logs (if relevant):

[2023-02-13T08:34:38,430][ERROR][org.logstash.Logstash    ] uncaught exception (in thread kafka-input-worker-logstash-0)
org.jruby.exceptions.TypeError: (TypeError) wrong argument type NilClass (expected byte[])
	at org.jruby.javasupport.JavaArrayUtilities.bytes_to_ruby_string(org/jruby/javasupport/JavaArrayUtilities.java:77) ~[jruby-complete-9.2.20.1.jar:?]
	at org.jruby.javasupport.JavaArrayUtilities.bytes_to_ruby_string(org/jruby/javasupport/JavaArrayUtilities.java:58) ~[jruby-complete-9.2.20.1.jar:?]
	at org.jruby.java.addons.StringJavaAddons.from_java_bytes(org/jruby/java/addons/StringJavaAddons.java:16) ~[jruby-complete-9.2.20.1.jar:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.maybe_set_metadata(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:374) ~[?:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.maybe_set_metadata(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:373) ~[?:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.handle_record(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:358) ~[?:?]
	at usr.share.logstash.logstash_minus_core.lib.logstash.codecs.delegator.decode(/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:64) ~[?:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_plain_minus_3_dot_1_dot_0.lib.logstash.codecs.plain.decode(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-plain-3.1.0/lib/logstash/codecs/plain.rb:54) ~[?:?]
	at usr.share.logstash.logstash_minus_core.lib.logstash.codecs.delegator.decode(/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:62) ~[?:?]
	at org.logstash.instrument.metrics.AbstractSimpleMetricExt.time(org/logstash/instrument/metrics/AbstractSimpleMetricExt.java:65) ~[logstash-core.jar:?]
	at org.logstash.instrument.metrics.AbstractNamespacedMetricExt.time(org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java:64) ~[logstash-core.jar:?]
	at usr.share.logstash.logstash_minus_core.lib.logstash.codecs.delegator.decode(/usr/share/logstash/logstash-core/lib/logstash/codecs/delegator.rb:61) ~[?:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.handle_record(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:356) ~[?:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_9_dot_0_minus_java.lib.logstash.inputs.kafka.thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.9.0-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]

The issue is fixed in 11.2.1