logstash-plugins/logstash-output-kafka

Logstash is not able to connect to Kafka if Kafka host name is not resolved when Logstash starts

doronbl opened this issue ยท 9 comments

Description of the problem:

Steps 1

Kafka and Logstash are both shutdown
Configure Logstash output to connect to some unresolved Kafka host.
Run Logstash
Result:
Logstash is up and running. "Successfully started Logstash API endpoint" is printed
Expected behaviour:
Either perform retry to connect or shutdown Logstash

Step 2

Configure /etc/hosts to resolve the unknown host
Run Kafka (with default configuration)
Result:
Logstash is still unable to connect
Expected result:
Logstash, if running should be able to resolve the host and reconnect on next retry

Observation 1

The same does not happen if I configure Logstash to connect to the Kafka IP or localhost (I run both on the same host). For some reason, host name resolution affect plugin behavior.

Observation 2

When running without --config.reload.automatic flag, logstash shutdown when not able to resolve the host name. Not clear why --config.reload.automatic have effect on this behaviour

Affect on Docker deployments

The problem was first observed when running Logstash and Kafka in Docker (Swarm). When using overlay network, Logstash configuration is configured to resolve Kafka domain name in Docker internal DNS. In some cases, for example when rebooting the host, when Logstash starts before Kafka, Kafka DNS entry does not exists. Thus Logstash starts and is not able to resolve the host name. Later on when Kafka is up and the DNS holds a record for it, Logstash still not able to reconnect running Kafka service.

Seems that there is not retry mechanism for this plugin.

Issue Information:

OS Version: CentOS 7
uname -a: Linux localhost.localdomain 3.10.0-514.26.2.el7.x86_64 #1 SMP Tue Jul 4 15:04:05 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
Logstash Version: logstash-5.5.2
Kafka Version: kafka_2.11-0.11.0.0
Kafka Output Plugin Version: logstash-output-kafka (5.1.7)

Logstash Config:
input {
kafka {
topics => ["input"]
bootstrap_servers => "localhost:9092"
auto_offset_reset => "earliest"
}
}

output {
kafka
{
topic_id => "output"
bootstrap_servers => "unresolvedomain:9092"
codec => rubydebug
}
}

Running Logstash when Kafka is down:
[root@localhost logstash-5.5.2]# bin/logstash -f ./conf.conf --config.reload.automatic
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to /root/LS/logstash-5.5.2/logs which is now configured via log4j2.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/LS/logstash-5.5.2/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.8/vendor/jar-dependencies/runtime-jars/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/LS/logstash-5.5.2/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/vendor/jar-dependencies/runtime-jars/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[2017-09-13T00:34:52,023][ERROR][logstash.outputs.kafka ] Unable to create Kafka producer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer}
[2017-09-13T00:34:52,037][ERROR][logstash.pipeline ] Error registering plugin {:plugin=>"#<LogStash::OutputDelegator:0xd5acf1b @namespaced_metric=#<LogStash::Instrument::NamespacedMetric:0x43a2cdff @Metric=#<LogStash::Instrument::Metric:0x1e3c02c0 @collector=#<LogStash::Instrument::Collector:0x53a8927f @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x8d1bd06 @store=#<Concurrent::Map:0x00000000062668 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x5dc954d6, @fast_lookup=#<Concurrent::Map:0x0000000006266c entries=58 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :"fa0c0cda67c27f0d75547b7dfe084d7e3ce7d1aa-2"]>, @Metric=#<LogStash::Instrument::NamespacedMetric:0x34b132db @Metric=#<LogStash::Instrument::Metric:0x1e3c02c0 @collector=#<LogStash::Instrument::Collector:0x53a8927f @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x8d1bd06 @store=#<Concurrent::Map:0x00000000062668 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x5dc954d6, @fast_lookup=#<Concurrent::Map:0x0000000006266c entries=58 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs]>, @logger=#<LogStash::Logging::Logger:0x26f50dc5 @logger=#Java::OrgApacheLoggingLog4jCore::Logger:0x1944b06d>, @strategy=#<LogStash::OutputDelegatorStrategies::Shared:0x5387765b @output=<LogStash::Outputs::Kafka topic_id=>"output", bootstrap_servers=>"unresolvedomain:9092", codec=><LogStash::Codecs::RubyDebug id=>"rubydebug_4ea344bd-89e9-4484-a229-e79825f5e8e4", enable_metric=>true, metadata=>false>, id=>"fa0c0cda67c27f0d75547b7dfe084d7e3ce7d1aa-2", enable_metric=>true, workers=>1, acks=>"1", batch_size=>16384, block_on_buffer_full=>true, buffer_memory=>33554432, compression_type=>"none", key_serializer=>"org.apache.kafka.common.serialization.StringSerializer", linger_ms=>0, max_request_size=>1048576, metadata_fetch_timeout_ms=>60000, metadata_max_age_ms=>300000, receive_buffer_bytes=>32768, reconnect_backoff_ms=>10, retries=>0, retry_backoff_ms=>100, send_buffer_bytes=>131072, ssl=>false, security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", timeout_ms=>30000, value_serializer=>"org.apache.kafka.common.serialization.StringSerializer">>, @id="fa0c0cda67c27f0d75547b7dfe084d7e3ce7d1aa-2", @metric_events=#<LogStash::Instrument::NamespacedMetric:0xe51575d @Metric=#<LogStash::Instrument::Metric:0x1e3c02c0 @collector=#<LogStash::Instrument::Collector:0x53a8927f @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x8d1bd06 @store=#<Concurrent::Map:0x00000000062668 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x5dc954d6, @fast_lookup=#<Concurrent::Map:0x0000000006266c entries=58 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :"fa0c0cda67c27f0d75547b7dfe084d7e3ce7d1aa-2", :events]>, @output_class=LogStash::Outputs::Kafka>", :error=>"Failed to construct kafka producer"}
[2017-09-13T00:34:52,049][ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:335)", "org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/root/LS/logstash-5.5.2/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:242)", "RUBY.register(/root/LS/logstash-5.5.2/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:178)", "RUBY.register(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:9)", "RUBY.register(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/output_delegator.rb:41)", "RUBY.register_plugin(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:281)", "RUBY.register_plugins(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:292)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:292)", "RUBY.start_workers(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:301)", "RUBY.run(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/pipeline.rb:226)", "RUBY.start_pipeline(/root/LS/logstash-5.5.2/logstash-core/lib/logstash/agent.rb:398)", "java.lang.Thread.run(java/lang/Thread.java:748)"]}
[2017-09-13T00:34:52,101][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

I have a similar issue.
My Kubernetes platform had an ELK cluster.
Logstash -> svc(kubernetes) -> Kafka(6 instances) <- svc(kubernetes) <- Logstash -> svc(kubernetes) -> Elasticsearch.
When one of Kafka instances be rescheduled. This instance will get a new IP.
But Logstash used old instance IP continuous. Not update.
How can I set DNS TTL to update backend IP in Logstash Kafka output plugin?

We've got similar issue: Logstash initialises fine, Kafka plugin behaves properly, until Kafka brokers DNS record changes, and logstash constantly fails insisting on previously resolved IP address.

We would expect the plugin to refresh the IP address for those hostnames, but it didn't.

EDIT: we're trialling version 8.1.0, after the kafka client was bumped from 2.1.0 to 2.3.0, and shall have fixed https://cwiki.apache.org/confluence/display/KAFKA/KIP-302+-+Enable+Kafka+clients+to+use+all+DNS+resolved+IP+addresses

I am facing exactly the same issue on our k8s setup. After kafka brokers restart, they get new IPs and logstash keeps failing because it has cached the old IP address.

Is there any known solution for this?

As an interim fix ( more of a hack ) we ended up writing the custom fix. We did a TEE for the logs that are going on STDOUT and started writing the same into file as well. And we have a shell that greps "Broker may not be available" from that log file, the same shell script is a livenessProbe as well.

This way, whenever there are more than n lines of logs that say broker may not be available, livenessProbe fails and k8s restarts the container. Ugly - but works!

As a permanent fix kafka version of this plugin needs to be bumped up as this issue is in underlying version of kafka client version. The same has been addressed here ( there are a few tickets that reported similar issues including the one mentioned by JoseAlban above : https://issues.apache.org/jira/browse/KAFKA-7755

It looks like the call to @producer = create_producer needs to be redone from https://github.com/logstash-plugins/logstash-output-kafka/blob/master/lib/logstash/outputs/kafka.rb if the "Broker may not be available" error is appearing. Can anyone offer some assistance as to where exactly this should be? Somewhere in the retrying_send function?

Correct me if I'm wrong

TJM commented

FYI: we worked around this issue by updating the kafka plugins in the container...

FROM docker.elastic.co/logstash/logstash:6.8.7

RUN logstash-plugin install logstash-filter-prune && \
    logstash-plugin update logstash-output-kafka && \
    logstash-plugin update logstash-input-kafka
$ k exec -it shipper-logstash-4 -- bash
bash-4.2$ logstash-plugin list --verbose | grep kafka
logstash-input-kafka (9.1.0)
logstash-output-kafka (8.1.0)

Stumbled upon this problem today. Our configs leverage a static IP for kafka, however, we noticed the host started to not be able to reach it's configured DNS (in /etc/resolv.conf). Simply removing the bad entry fixed our problems with this plugin.

lhzw commented

Tried to update logstash kafka plugins, did not work:
logstash-input-kafka (9.1.0)
logstash-output-kafka (8.1.0)

Also tried logstash image 7.14.1, with this, still did not work, oh my:
bash-4.2$ grep kafka x
logstash-integration-kafka (10.8.1)
โ”œโ”€โ”€ logstash-input-kafka
โ””โ”€โ”€ logstash-output-kafka

lhzw commented

Mine, refer to @buch11 :

kind: Deployment
metadata:
  name: log2es
  labels:
    app: log2es
spec:
  replicas: 1
  selector:
    matchLabels:
      app: log2es
  template:
    metadata:
      labels:
        app: log2es
    spec:
      securityContext:
        runAsUser: 1000
        fsGroup: 1000
      initContainers:
      - name: create-liveness-script
        image: busybox
        imagePullPolicy: IfNotPresent
        securityContext:
          privileged: true
        securityContext:
          privileged: true
          runAsUser: 1000
        command: ["/bin/sh"]
        args: ["-c", 'echo "if ! grep \"Broker may not be available\" /tmp/log >/dev/null 2>&1; then exit 0; else echo \"kafka is out of reach, need to restart.\"; exit 1; fi; size=\$(stat -c %s /tmp/log); if [ \$size -gt 10485760 ]; then > /tmp/log; fi; " > /livenessdir/live.sh; chmod +x /livenessdir/live.sh ']
        volumeMounts:
          - mountPath: /livenessdir
            name: livenessdir
      containers:
      - name: log2es
        image: .....
        command: ["/bin/sh"]
        args: ["-c", "logstash -f /etc/logstash/conf.d/logserver.conf | tee /tmp/log"]
        livenessProbe:
          exec:
            command:
            - /bin/sh
            - /livenessdir/live.sh
          initialDelaySeconds: 60
          periodSeconds: 5
          failureThreshold: 3
        volumeMounts:
          - mountPath: /livenessdir
            name: livenessdir

      volumes:
        - name: livenessdir
          emptyDir: {}
---