Exceptions are thrown under high load
Closed this issue · 2 comments
frapex commented
The company I work for is using the udp input together with the collectd codec.
Under high load conditions the collectd codec starts to throw exceptions like:
- java.lang.ArrayIndexOutOfBoundsException: 4
- java.lang.ArrayIndexOutOfBoundsException: 80
- OpenSSL::Cipher::CipherError: Cipher not initialized
- OpenSSL::Cipher::CipherError: No message available
- OpenSSL::Cipher::CipherError: output buffer too short
- Unable to decrypt packet, checksum mismatch
I investigated the problem. I found out that that at least OpenSSL::Digest and OpenSSL::Cipher are not thread safe.
Unfortunately the collectd codec assumes they are. No more exceptions are raised if workers => 1 (in logstash.conf) is set.
I have created a patch that solves all of these problems:
- digests (sha256, sha1) are generated using the digest package
- synchronized access to OpenSSL::Cipher functions
Reference:
frapex commented
PATCH:
--- collectd.rb 2015-08-02 16:10:40.625549383 +0200
+++ /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-collectd-1.0.1/lib/logstash/codecs/collectd.rb 2015-08-02 19:21:50.926950615 +0200
@@ -44,6 +44,8 @@
#
class LogStash::Codecs::Collectd < LogStash::Codecs::Base
config_name "collectd"
+
+ @@openssl_mutex = Mutex.new
AUTHFILEREGEX = /([^:]+): (.+)/
@@ -160,11 +162,15 @@
if @authfile.nil?
raise "Security level is set to #{@security_level}, but no authfile was configured"
else
- # Load OpenSSL and instantiate Digest and Crypto functions
+ # Load Digest and instantiate functions
+ require 'digest'
+ @sha256 = Digest::SHA256.new
+ @sha1 = Digest::SHA1.new
+
+ # Load OpenSSL and instantiate functions
require 'openssl'
- @sha256 = OpenSSL::Digest::Digest.new('sha256')
- @sha1 = OpenSSL::Digest::Digest.new('sha1')
@cipher = OpenSSL::Cipher.new('AES-256-OFB')
+
@auth = {}
parse_authfile
end
@@ -364,16 +370,20 @@
return []
end
- # Set the correct state of the cipher instance
- @cipher.decrypt
- @cipher.padding = 0
- @cipher.iv = iv
- @cipher.key = @sha256.digest(key);
- # Decrypt the content
- plaintext = @cipher.update(content) + @cipher.final
- # Reset the state, as adding a new key to an already instantiated state
- # results in an exception
- @cipher.reset
+ # Coordinate access to OpenSSL::Cipher as it is not thread safe
+ plaintext = nil
+ @@openssl_mutex.synchronize do
+ # Set the correct state of the cipher instance
+ @cipher.decrypt
+ @cipher.padding = 0
+ @cipher.iv = iv
+ @cipher.key = @sha256.digest(key)
+ # Decrypt the content
+ plaintext = @cipher.update(content) + @cipher.final
+ # Reset the state, as adding a new key to an already instantiated state
+ # results in an exception
+ @cipher.reset
+ end
# The plaintext contains a SHA1 hash as checksum in the first 160 bits
# (20 octets) of the rest of the data