logstash-plugins/logstash-codec-collectd

plugin mangles collectd metrics under some conditions

Closed this issue · 2 comments

I found out that the plugin mangles collectd metrics under some conditions.
The bug is triggered when (for example) the percent metrics are send in rapid succession, one after another.

These metrics:

  • cpu.percent-active.value
  • swap.percent-used.value
  • swap.percent-cached.value
  • memory.percent-slab_recl.value
  • memory.percent-slab_unrecl.value
  • memory.percent-cached.value
  • memory.percent-buffered.value
  • memory.percent-used.value

Become like this:

  • cpu.active.value
  • swap.used.value
  • swap.cached.value
  • memory.slab_recl.value
  • memory.slab_unrecl.value
  • memory.cached.value
  • memory.buffered.value
  • memory.used.value

The type is lost. A new PLUGIN_TYPE should not cause the 'collectd_type' field to be removed from the event.

References to somewhat similar issues:

  • Wrong assumptions decoding protocol: #13
  • collectd plugin's type_instance missing problem: elastic/logstash#2014
  • plugin_type persistence: #6

Testing was done using collectd-5.5.0-2.el7.x86_64 and logstash-1.5.2-1.noarch.
The bug can be reproduced easily using the configuration files below:

# /tmp/collectd_test.conf
LoadPlugin cpu
LoadPlugin swap
LoadPlugin memory
LoadPlugin network
LoadPlugin match_regex
LoadPlugin target_replace

<Plugin cpu>
  ReportByCpu false
  ReportByState false
  ValuesPercentage true
</Plugin>

<Plugin swap>
  ReportBytes true
  ValuesAbsolute false
  ValuesPercentage true
</Plugin>

<Plugin memory>
  ValuesPercentage true
</Plugin>

<Plugin network>
    <Server "localhost">
    </Server>
</Plugin>
# /tmp/logstash_test.conf
input {
  udp {
    port => 25826
    buffer_size => 1452
  }
}

output {
  if ![collectd_type] {
    stdout { codec => "rubydebug" }
  }
}
# cmds
/usr/sbin/collectd -C /tmp/collectd_test.conf -f
sudo -u logstash /opt/logstash/bin/logstash -f /tmp/logstash_test.conf

I crafted two patches:

  • patch 1: just make it work
  • patch 2: removed the field cleanup code entirely (it is not required in my opinion)

The patches include some other improvements as well.

PATCH 1 (just make it work):

--- a/collectd.rb  2015-08-05 19:22:03.486795136 +0200
+++ b/collectd.rb    2015-08-06 08:47:46.864839543 +0200
@@ -49,6 +49,7 @@

   PLUGIN_TYPE = 2
   COLLECTD_TYPE = 4
+  COLLECTD_VALUES = 6
   SIGNATURE_TYPE = 512
   ENCRYPTION_TYPE = 528

@@ -59,7 +60,7 @@
     3               => "plugin_instance",
     COLLECTD_TYPE   => "collectd_type",
     5               => "type_instance",
-    6               => "values",
+    COLLECTD_VALUES => "values",
     7               => "interval",
     8               => "@timestamp",
     9               => "interval",
@@ -72,6 +73,7 @@
   PLUGIN_TYPE_FIELDS = {
     'host' => true,
     '@timestamp' => true,
+    'collectd_type' => true,
     'type_instance' => true,
     'severity' => true,
   }
@@ -93,10 +95,10 @@

   INTERVAL_BASE_FIELDS = {
     'host' => true,
-    'collectd_type' => true,
+    '@timestamp' => true,
     'plugin' => true,
     'plugin_instance' => true,
-    '@timestamp' => true,
+    'collectd_type' => true,
     'type_instance' => true,
   }

@@ -207,7 +209,7 @@
       byte1, byte2 = body.pack("C*").unpack("NN")
       Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
     end
-    # Hi resolution intervals
+    # Hi-Resolution intervals
     hiresinterval_decoder = lambda do |body|
       byte1, byte2 = body.pack("C*").unpack("NN")
       Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
@@ -297,7 +299,7 @@
   def get_values(id, body)
     drop = false
     add_tag = false
-    if id == 6
+    if id == COLLECTD_VALUES
       retval, drop, add_nan_tag = @id_decoder[id].call(body)
     # Use hash + closure/lambda to speed operations
     else
@@ -421,15 +423,14 @@
         was_encrypted = true
         next
       when PLUGIN_TYPE
-        # We've reached a new plugin, delete everything except for the the host
-        # field, because there's only one per packet and the timestamp field,
-        # because that one goes in front of the plugin
+        # Reached a new plugin, delete all fields that might
+        # be related to the previous plugin (if any).
         collectd.each_key do |k|
           collectd.delete(k) unless PLUGIN_TYPE_FIELDS.has_key?(k)
         end
       when COLLECTD_TYPE
-        # We've reached a new type within the plugin section, delete all fields
-        # that could have something to do with the previous type (if any)
+        # Reached a new type, delete all fields that might
+        # be related to the previous type (if any).
         collectd.each_key do |k|
           collectd.delete(k) unless COLLECTD_TYPE_FIELDS.has_key?(k)
         end

PATCH 2 (removed the field cleanup code entirely):

--- a/collectd.rb  2015-08-05 19:22:03.486795136 +0200
+++ b/collectd.rb    2015-08-06 09:03:00.393084304 +0200
@@ -49,6 +49,7 @@

   PLUGIN_TYPE = 2
   COLLECTD_TYPE = 4
+  COLLECTD_VALUES = 6
   SIGNATURE_TYPE = 512
   ENCRYPTION_TYPE = 528

@@ -59,7 +60,7 @@
     3               => "plugin_instance",
     COLLECTD_TYPE   => "collectd_type",
     5               => "type_instance",
-    6               => "values",
+    COLLECTD_VALUES => "values",
     7               => "interval",
     8               => "@timestamp",
     9               => "interval",
@@ -69,22 +70,6 @@
     ENCRYPTION_TYPE => "encryption"
   }

-  PLUGIN_TYPE_FIELDS = {
-    'host' => true,
-    '@timestamp' => true,
-    'type_instance' => true,
-    'severity' => true,
-  }
-
-  COLLECTD_TYPE_FIELDS = {
-    'host' => true,
-    '@timestamp' => true,
-    'plugin' => true,
-    'plugin_instance' => true,
-    'type_instance' => true,
-    'severity' => true,
-  }
-
   INTERVAL_VALUES_FIELDS = {
     "interval" => true,
     "values" => true,
@@ -93,10 +78,10 @@

   INTERVAL_BASE_FIELDS = {
     'host' => true,
-    'collectd_type' => true,
+    '@timestamp' => true,
     'plugin' => true,
     'plugin_instance' => true,
-    '@timestamp' => true,
+    'collectd_type' => true,
     'type_instance' => true,
   }

@@ -207,7 +192,7 @@
       byte1, byte2 = body.pack("C*").unpack("NN")
       Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
     end
-    # Hi resolution intervals
+    # Hi-Resolution intervals
     hiresinterval_decoder = lambda do |body|
       byte1, byte2 = body.pack("C*").unpack("NN")
       Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
@@ -297,7 +282,7 @@
   def get_values(id, body)
     drop = false
     add_tag = false
-    if id == 6
+    if id == COLLECTD_VALUES
       retval, drop, add_nan_tag = @id_decoder[id].call(body)
     # Use hash + closure/lambda to speed operations
     else
@@ -420,19 +405,6 @@
         raise(EncryptionError) if payload.empty?
         was_encrypted = true
         next
-      when PLUGIN_TYPE
-        # We've reached a new plugin, delete everything except for the the host
-        # field, because there's only one per packet and the timestamp field,
-        # because that one goes in front of the plugin
-        collectd.each_key do |k|
-          collectd.delete(k) unless PLUGIN_TYPE_FIELDS.has_key?(k)
-        end
-      when COLLECTD_TYPE
-        # We've reached a new type within the plugin section, delete all fields
-        # that could have something to do with the previous type (if any)
-        collectd.each_key do |k|
-          collectd.delete(k) unless COLLECTD_TYPE_FIELDS.has_key?(k)
-        end
       end

       raise(EncryptionError) if !was_encrypted and @security_level == SECURITY_ENCR

Fixed by #16