logstash-plugins/logstash-integration-kafka

Kafka connection error force Logstash process to shutdown

andsel opened this issue · 3 comments

  • Version: 10.5.3
  • logstash-input-kafka

If an authentication error is raised at Kafka client level, that error seems not be translated as Ruby StandardError and this make Logstash process to shutdown.

Given the exception:
[2021-02-02T10:48:03,276][INFO ][org.apache.kafka.common.security.authenticator.AbstractLogin][main][kafka-input] Successfully logged in.
[2021-02-02T10:48:03,279][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][kafka-input] Kafka version: 2.4.1
[2021-02-02T10:48:03,279][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][kafka-input] Kafka commitId: c57222ae8cd7866b
[2021-02-02T10:48:03,279][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][kafka-input] Kafka startTimeMs: 1612262883279
[2021-02-02T10:48:03,280][WARN ][org.apache.kafka.common.utils.AppInfoParser][main][kafka-input] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-host.local-0
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?]
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?]
	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) [kafka-clients-2.4.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) [kafka-clients-2.4.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) [kafka-clients-2.4.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) [kafka-clients-2.4.1.jar:?]
	at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) [?:?]
	at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [?:?]
	at java.lang.reflect.Constructor.newInstance(Constructor.java:490) [?:?]
	at org.jruby.javasupport.JavaConstructor.newInstanceDirect(JavaConstructor.java:285) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.java.invokers.ConstructorInvoker.call(ConstructorInvoker.java:86) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.java.invokers.ConstructorInvoker.call(ConstructorInvoker.java:175) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:386) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:184) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.java.proxies.ConcreteJavaProxy$InitializeMethod.call(ConcreteJavaProxy.java:56) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:182) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyClass.newInstance(RubyClass.java:918) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroOrOneOrNBlock.call(JavaMethod.java:349) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.java.proxies.ConcreteJavaProxy$NewMethod.call(ConcreteJavaProxy.java:158) [jruby-complete-9.2.13.0.jar:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$method$create_consumer$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:350) [jruby-complete-9.2.13.0.jar:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$method$create_consumer$0$__VARARGS__(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207) [jruby-complete-9.2.13.0.jar:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$block$run$1(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:247) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:148) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.BlockBody.yield(BlockBody.java:106) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.Block.yield(Block.java:184) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyEnumerable$22.call(RubyEnumerable.java:902) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.CallBlock19.doYield(CallBlock19.java:111) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.BlockBody.yield(BlockBody.java:117) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.Block.yieldValues(Block.java:200) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyEnumerator$2.call(RubyEnumerator.java:404) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.BlockCallback.call(BlockCallback.java:40) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.CallBlock.doYield(CallBlock.java:96) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.BlockBody.yield(BlockBody.java:108) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.Block.yield(Block.java:184) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyFixnum.times(RubyFixnum.java:291) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyInteger$INVOKER$i$0$0$times.call(RubyInteger$INVOKER$i$0$0$times.gen) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroBlock.call(JavaMethod.java:555) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyClass.finvokeWithRefinements(RubyClass.java:514) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyClass.finvoke(RubyClass.java:502) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.Helpers.invoke(Helpers.java:448) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyBasicObject.callMethod(RubyBasicObject.java:393) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyEnumerator.__each__(RubyEnumerator.java:400) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyEnumerator.each(RubyEnumerator.java:396) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyEnumerator$INVOKER$i$each.call(RubyEnumerator$INVOKER$i$each.gen) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyClass.finvokeWithRefinements(RubyClass.java:497) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyClass.finvoke(RubyClass.java:487) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.Helpers.invoke(Helpers.java:436) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyEnumerable.callEach19(RubyEnumerable.java:119) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyEnumerable.collectCommon(RubyEnumerable.java:894) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyEnumerable.map(RubyEnumerable.java:886) [jruby-complete-9.2.13.0.jar:?]
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$method$run$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:247) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:106) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:140) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:278) [jruby-complete-9.2.13.0.jar:?]
	at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$inputworker$0(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:106) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:140) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:278) [jruby-complete-9.2.13.0.jar:?]
	at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_input$1(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:396) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.runtime.Block.call(Block.java:139) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.RubyProc.call(RubyProc.java:318) [jruby-complete-9.2.13.0.jar:?]
	at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105) [jruby-complete-9.2.13.0.jar:?]
	at java.lang.Thread.run(Thread.java:834) [?:?]

We have that the pipeline crashes, and java_pipeline.rb exit in bad way that Logstash process exit with system error. Looking at previous stack, the Ruby parts are:

logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:350)
logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:247)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$inputworker$0(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_input$1(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:396

In the Java pipeline execution https://github.com/elastic/logstash/blob/v7.10.1/logstash-core/lib/logstash/java_pipeline.rb#L400-L434 all the not Ruby's StandardError subclasses force the input (and the pipeline) to close and not retry the input

kares commented

Here's a more concerning error that seems to propagate all the way up (from Kafka client):

[ERROR][logstash.javapipeline    ][placement-abc][pipeline-placement] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:pipeline-placement
  Plugin: <LogStash::Inputs::Kafka topics=>["foo-topic"], consumer_threads=>1, sasl_jaas_config=>"org.apache.kafka.common.security.plain.PlainLoginModule required username=$ConnectionString password='Endpoint=sb://sample.servicebus.local.net/;SharedAccessKeyName=XXX;SharedAccessKey=YYY=';", bootstrap_servers=>"local.net:9090", client_id=>"local", decorate_events=>true, codec=><LogStash::Codecs::JSON charset=>"Windows-1252", id=>"879dc661-94cf-4a3b-b29c-50d338acd9e9", enable_metric=>true>, auto_offset_reset=>"latest", group_id=>"logstash-im", sasl_mechanism=>"PLAIN", security_protocol=>"SASL_SSL", id=>"pipeline-placement", enable_metric=>true, auto_commit_interval_ms=>5000, check_crcs=>true, client_dns_lookup=>"default", connections_max_idle_ms=>540000, enable_auto_commit=>true, fetch_max_bytes=>52428800, fetch_max_wait_ms=>500, heartbeat_interval_ms=>3000, isolation_level=>"read_uncommitted", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", max_poll_interval_ms=>300000, max_partition_fetch_bytes=>1048576, max_poll_records=>500, metadata_max_age_ms=>300000, receive_buffer_bytes=>32768, reconnect_backoff_ms=>50, request_timeout_ms=>40000, retry_backoff_ms=>100, send_buffer_bytes=>131072, session_timeout_ms=>10000, value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https">
  Error: Not authorized to access topics: [foo-topic]
  Exception: Java::OrgApacheKafkaCommonErrors::TopicAuthorizationException
  Stack: 
[FATAL][logstash.runner          ] An unexpected error occurred! {:error=>org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [foo-topic], :backtrace=>[]}
[ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

PR in #87

Closed by #87