logstash-plugins/logstash-input-kafka

Kafka input threads die in some cases and logstash does not notice it

yfoelling opened this issue · 2 comments

  • Version: tested 8.0.4 and 8.1.1 on logstash 6.3.0 and 6.2.x
  • Operating System: Ubuntu Linux
  • Config File (if you have sensitive info, please remove it):
input {
  kafka {
    auto_offset_reset  => "earliest"
    bootstrap_servers  => "KAFKA ENDPOINTS"
    consumer_threads   => 4
    enable_auto_commit => "false"
    group_id           => "MYGROUP"
    client_id          => "MYCONSUMER"
    topics_pattern     => "MYPATTERN"
    codec              => "json"
  }
}
[....]
  • Sample Data:
    Any data
  • Steps to Reproduce:
    Get any Kafka Client Error.

In this case: Delete a Topic that is included in the Topic pattern
You will get following error:

[ERROR] 2018-07-12 12:43:00.388 [Ruby-0-Thread-24: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:242] ConsumerCoordinator - [Consumer clientId=CLIENTID, groupId=GROUPID] Offset commit failed on partition PARTITION at offset 0: This server does not host this topic-partition.
Exception in thread "Ruby-0-Thread-24: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:242" org.apache.kafka.common.KafkaException: Topic or Partition PARTITION does not exist
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:778)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:726)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:822)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:802)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(org/apache/kafka/clients/consumer/internals/RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(org/apache/kafka/clients/consumer/internals/RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(org/apache/kafka/clients/consumer/internals/RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:563)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:390)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:209)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:597)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(org/apache/kafka/clients/consumer/KafkaConsumer.java:1218)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:438)
at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:302)
at RUBY.block in thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:271)
at org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)
at org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)
at java.lang.Thread.run(java/lang/Thread.java:748)

In that case the error occurs because the enable_auto_commit => "false" makes a manual commit and there is an error and no error-handling for that case.

This is only one of a few cases I run into. There are some similar problem when elasticsearch is down for some time.
In all cases, the Kafka input threads "die" but logstash keeps on running, so there is no way for me noticing that logstash isn't working properly any more.
It seems to be the case that logstash does not notice that the plugin is not working any more so it just waits for further input. I have seen some other plugins being restarted by logstash on errors, but that not the case for this plugin,

I have some possible solutions, but I am sadly not good enough with ruby to fix them myself...
1:
Build a real listener for topics-pattern changes:
https://github.com/logstash-plugins/logstash-input-kafka/blob/master/lib/logstash/inputs/kafka.rb#L244

          nooplistener = org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener.new
          pattern = java.util.regex.Pattern.compile(@topics_pattern)
          consumer.subscribe(pattern, nooplistener)

AND
retry on any error and restart plugin after some tries.

  1. just retry on any error and restart plugin after some tries.

Little workaround for the people out there with similar issues:
set enable_auto_commit => "false" to true, so the kafka client commits in background, then you dont have issues with changes topics anymore. Problems with restarted Elasticsearch (logstash-output) will still occur.
You could also make a jstack regulary and search for kafka and restart if kafka threads are not running anymore

Hi, we got the same issues. Have you solved it yet?

Yes the kafka input (sub)threads management needs reworking, their lifecycle is not supervised and exception handling is also not correct (related to elastic/logstash#11603)