logstash-plugins/logstash-input-kafka

Kafka Input Polling issue

andrewvc opened this issue · 4 comments

Moved from elastic/logstash#9421


  • Version: 6.2.4
  • Operating System: Ubuntu 16.04
  • Config File (if you have sensitive info, please remove it):
input {
  kafka {
    auto_offset_reset  => "earliest"
    bootstrap_servers  => "SERVERS"
    consumer_threads   => 4
    enable_auto_commit => "false"
    group_id           => "GROUP"
    client_id          => "CLIENT"
    topics_pattern     => "PATTERN"
    codec              => "json"
  }
}
filter {
[...]
}
output {
  elasticsearch {
    index =>              "INDEX"
    hosts =>              ["SERVER"]
    document_id =>        "%{[@metadata][fingerprint]}"
    template =>           "/etc/logstash/index_template.json"
    template_name =>      "logging"
    template_overwrite => true
  }
}
  • Steps to Reproduce:
  1. take a config like the one above (filters are not required)
  2. start logstash (with some continous input)
  3. make ES unreachable somehow
  4. make it reachable after some time

More Info:
With Kafka input and Elasticsearch output, logstash stops if elasticsearch is unreachable for a while.
The process is still running, but doesnt process new data.

The logstash is running in Kubernetes, so i have the problem that i need to manualy restart it each time ES was not reachable. IMHO it should end/stop the sevice or wait for the cluster to get up again.

Log output while ES is down (for some hours in my case):

2018-04-23T16:58:39.510378721Z [WARN ] 2018-04-23 16:58:39.510 [Ruby-0-Thread-11@[main]>worker1: /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:385] elasticsearch - UNEXPECTED POOL ERROR {:e=>#}
2018-04-23T16:58:39.510487776Z [ERROR] 2018-04-23 16:58:39.510 [Ruby-0-Thread-11@[main]>worker1: /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:385] elasticsearch - Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down? {:error_message=>"No Available connections", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError", :will_retry_in_seconds=>64}
2018-04-23T16:58:42.951478403Z [WARN ] 2018-04-23 16:58:42.951 [Ruby-0-Thread-8: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.1.1-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:232] elasticsearch - Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"SERVER", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [SERVER][Manticore::SocketException] Connection refused (Connection refused)"}

Log output at the time es is up again:

2018-04-23T16:59:42.326249582Z Exception in thread "Ruby-0-Thread-23: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.6/lib/logstash/inputs/kafka.rb:241" Exception in thread "Ruby-0-Thread-22: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.6/lib/logstash/inputs/kafka.rb:241" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2018-04-23T16:59:42.326318585Z 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:721)
2018-04-23T16:59:42.326342549Z 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:599)
2018-04-23T16:59:42.326350452Z 	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(org/apache/kafka/clients/consumer/KafkaConsumer.java:1203)
2018-04-23T16:59:42.326358068Z 	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
2018-04-23T16:59:42.326363546Z 	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:438)
2018-04-23T16:59:42.326455383Z 	at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:302)
2018-04-23T16:59:42.32646587Z 	at RUBY.block in thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.6/lib/logstash/inputs/kafka.rb:269)
2018-04-23T16:59:42.326472091Z 	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)
2018-04-23T16:59:42.326477217Z 	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)
2018-04-23T16:59:42.326483081Z 	at java.lang.Thread.run(java/lang/Thread.java:748)
2018-04-23T16:59:42.326607591Z org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2018-04-23T16:59:42.326619456Z 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:721)
2018-04-23T16:59:42.326680519Z 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:599)
2018-04-23T16:59:42.326690954Z 	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(org/apache/kafka/clients/consumer/KafkaConsumer.java:1203)
2018-04-23T16:59:42.326696757Z 	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
2018-04-23T16:59:42.326703114Z 	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:438)
2018-04-23T16:59:42.326749471Z 	at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:302)
2018-04-23T16:59:42.326760403Z 	at RUBY.block in thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.6/lib/logstash/inputs/kafka.rb:269)
2018-04-23T16:59:42.326801054Z 	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)
2018-04-23T16:59:42.326889432Z 	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)
2018-04-23T16:59:42.326901776Z 	at java.lang.Thread.run(java/lang/Thread.java:748)
2018-04-23T16:59:42.33038824Z Exception in thread "Ruby-0-Thread-20: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.6/lib/logstash/inputs/kafka.rb:241" Exception in thread "Ruby-0-Thread-21: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.6/lib/logstash/inputs/kafka.rb:241" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2018-04-23T16:59:42.330428219Z 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:721)
2018-04-23T16:59:42.330513757Z 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:599)
2018-04-23T16:59:42.33053427Z 	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(org/apache/kafka/clients/consumer/KafkaConsumer.java:1203)
2018-04-23T16:59:42.330542261Z 	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
2018-04-23T16:59:42.330548441Z 	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:438)
2018-04-23T16:59:42.330554574Z 	at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:302)
2018-04-23T16:59:42.33056064Z 	at RUBY.block in thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.6/lib/logstash/inputs/kafka.rb:269)
2018-04-23T16:59:42.330566733Z 	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)
2018-04-23T16:59:42.330572125Z 	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)
2018-04-23T16:59:42.330677833Z 	at java.lang.Thread.run(java/lang/Thread.java:748)
2018-04-23T16:59:42.330688725Z org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2018-04-23T16:59:42.330696963Z 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:721)
2018-04-23T16:59:42.330702453Z 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:599)
2018-04-23T16:59:42.330713048Z 	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(org/apache/kafka/clients/consumer/KafkaConsumer.java:1203)
2018-04-23T16:59:42.330718522Z 	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
2018-04-23T16:59:42.330724478Z 	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:438)
2018-04-23T16:59:42.331021615Z 	at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:302)
2018-04-23T16:59:42.331034813Z 	at RUBY.block in thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.6/lib/logstash/inputs/kafka.rb:269)
2018-04-23T16:59:42.331041498Z 	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)
2018-04-23T16:59:42.331046719Z 	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)
2018-04-23T16:59:42.331051815Z 	at java.lang.Thread.run(java/lang/Thread.java:748)```

Same problem here :(

(stealing this response from @original-brownbear )

Kafka tracks the individual consumers in a consumer group (i.e. a number of LS instances) and tries to give each consumer one or more specific partitions of the data in the topic they’re consuming.
In order to achieve this, Kafka has to also track whether or not a consumer (LS Kafka input thread) is making any progress on their assigned partition and reassign partitions that have not seen progress in a set timeframe. This causes a problem when Logstash is requesting more events from the Kafka Broker than it can process within the timeout because it triggers reassignment of partitions. Reassignment of partitions can cause duplicate processing of events and significant throughput problems because of the time the reassignment takes.
Solution:

Fixing the problem is easy by reducing the number of records per request that LS polls from the Kafka Broker in on request, reducing the number of Kafka input threads and/or increasing the relevant timeouts in the Kafka Consumer configuration.

The number of records to pull in one request is set by the option max_poll_records.
If it exceeds the default value of 500, reducing this should be the first thing to try.
The number of input threads is given by the option consumer_threads.
If it exceeds the number of pipeline workers configured in the logstash.yml it should certainly be reduced.
If it is a large value (> 4), it likely makes sense to reduce it to 4 (if the client has the time/resources for it, it would be ideal to start with a value of 1 and then increment from there to find the optimal performance).
The relevant timeout is set via session_timeout_ms. It should be set to a value that ensures that the number of events in max_poll_records can be safely processed within. Example: pipeline throughput is 10k/s and max_poll_records is set to 1k => the value must be at least 100ms if consumer_threads is set to 1. If it is set to a higher value n, then the minimum session timeout increases proportionally to n * 100ms. In practice the value must be set much larger than the theoretical value because the behaviour of the outputs and filters in a pipeline follows a distribution. It should also be larger than the maximum time you expect your outputs to stall for.
The default setting is 10s == 10000ms. If a user is experiencing periodic problems with an output like Elasticsearch output that could stall because of load or similar effects, there is little downside to increasing this value significantly to say 60s. Note: Decreasing the max_poll_records is preferable to increasing this timeout from the performance perspective. Increasing this timeout is your only option if the client’s issues are caused by periodically stalling outputs. Check logs for evidence of stalling outputs (e.g. ES output logging status 429).

Hi @andrewvc, thanks for sharing the detailed tuning steps. What changes you suggest in case of slower indexing rates than incoming messages ?

After making these changes and running pipeline for couple of days, I am encountering heap issues on logstash indexer. Error are something like this,

Auto offset commit failed for group logstash-indexer-onprem: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Error while attempting to check/cancel excessively long grok patterns {:message=>"Java heap space", :class=>"Java::JavaLang::OutOfMemoryError", :backtrace=>[]}

@andrewvc any additional information on this? Also is there a way to have the plugin restart after it disconnects? (some sort of retry logic)