logstash-plugins/logstash-codec-collectd

Exceptions are thrown under high load

Closed this issue · 2 comments

The company I work for is using the udp input together with the collectd codec.
Under high load conditions the collectd codec starts to throw exceptions like:

  • java.lang.ArrayIndexOutOfBoundsException: 4
  • java.lang.ArrayIndexOutOfBoundsException: 80
  • OpenSSL::Cipher::CipherError: Cipher not initialized
  • OpenSSL::Cipher::CipherError: No message available
  • OpenSSL::Cipher::CipherError: output buffer too short
  • Unable to decrypt packet, checksum mismatch

I investigated the problem. I found out that that at least OpenSSL::Digest and OpenSSL::Cipher are not thread safe.
Unfortunately the collectd codec assumes they are. No more exceptions are raised if workers => 1 (in logstash.conf) is set.

I have created a patch that solves all of these problems:

  • digests (sha256, sha1) are generated using the digest package
  • synchronized access to OpenSSL::Cipher functions

Reference:

PATCH:

--- collectd.rb 2015-08-02 16:10:40.625549383 +0200
+++ /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-collectd-1.0.1/lib/logstash/codecs/collectd.rb    2015-08-02 19:21:50.926950615 +0200
@@ -44,6 +44,8 @@
 #
 class LogStash::Codecs::Collectd < LogStash::Codecs::Base
   config_name "collectd"
+
+  @@openssl_mutex = Mutex.new

   AUTHFILEREGEX = /([^:]+): (.+)/

@@ -160,11 +162,15 @@
       if @authfile.nil?
         raise "Security level is set to #{@security_level}, but no authfile was configured"
       else
-        # Load OpenSSL and instantiate Digest and Crypto functions
+        # Load Digest and instantiate functions
+        require 'digest'
+        @sha256 = Digest::SHA256.new
+        @sha1 = Digest::SHA1.new
+
+        # Load OpenSSL and instantiate functions
         require 'openssl'
-        @sha256 = OpenSSL::Digest::Digest.new('sha256')
-        @sha1 = OpenSSL::Digest::Digest.new('sha1')
         @cipher = OpenSSL::Cipher.new('AES-256-OFB')
+
         @auth = {}
         parse_authfile
       end
@@ -364,16 +370,20 @@
       return []
     end

-    # Set the correct state of the cipher instance
-    @cipher.decrypt
-    @cipher.padding = 0
-    @cipher.iv = iv
-    @cipher.key = @sha256.digest(key);
-    # Decrypt the content
-    plaintext = @cipher.update(content) + @cipher.final
-    # Reset the state, as adding a new key to an already instantiated state
-    # results in an exception
-    @cipher.reset
+    # Coordinate access to OpenSSL::Cipher as it is not thread safe
+    plaintext = nil
+    @@openssl_mutex.synchronize do
+      # Set the correct state of the cipher instance
+      @cipher.decrypt
+      @cipher.padding = 0
+      @cipher.iv = iv
+      @cipher.key = @sha256.digest(key)
+      # Decrypt the content
+      plaintext = @cipher.update(content) + @cipher.final
+      # Reset the state, as adding a new key to an already instantiated state
+      # results in an exception
+      @cipher.reset
+    end

     # The plaintext contains a SHA1 hash as checksum in the first 160 bits
     # (20 octets) of the rest of the data

Fixed by #16