mre/kafka-influxdb

Unmatched data kills the process

meetpradeepp opened this issue ยท 6 comments

Hello @mre, Thank you for brilliant code. I was able to get it for testing with no efforts.

I wanted to know your thoughts on input not matching to encoder settings. Should it die like it does now or possibly just print something?

mre commented

It should never die ๐Ÿ˜„
Could you attach some example input, the kafka-influxdb settings and the terminal output when it dies?
This would make testing so much easier.
If you do so, please make sure to anonymize the data if needed.

I agree :). Here is the output I sent which broke collectd encoder. I removed [] around values. First entry of flushing worked with correct value but malformed or unmatched json crashed it.

-- KAFKA INPUT --
[{"values":0,"dstypes":["derive"],"dsnames":["value"],"time":1436372292.412,"interval":10.000,"host":"26f2fc918f50","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"interrupt"}]

--- OUTPUT -----
Reading config file kafka_influxdb_conf.yaml
Flushing output buffer. 0.01 messages/s
Traceback (most recent call last):
File "/usr/bin/kafka_influxdb", line 9, in
load_entry_point('kafka-influxdb==0.7.2', 'console_scripts', 'kafka_influxdb')()
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/main.py", line 31, in main
start_consumer(config)
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/main.py", line 47, in start_consumer
client.consume()
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/worker.py", line 29, in consume
self.buffer.extend(self.encoder.encode(raw_message))
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/encoder/collectd_json_encoder.py", line 51, in encode
value = Encoder.format_value(entry)
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/encoder/collectd_json_encoder.py", line 95, in format_value
if len(values) == 1:
TypeError: object of type 'int' has no len()

mre commented

So, quick guess:
It looks like values in your input should be a list.
Like so:

[
   {
      "values":[
         0
      ],
      "dstypes":[
         "derive"
      ],
      "dsnames":[
         "value"
      ],
      "time":1436372292.412,
      "interval":10.000,
      "host":"26f2fc918f50",
      "plugin":"cpu",
      "plugin_instance":"1",
      "type":"cpu",
      "type_instance":"interrupt"
   }
]

I guess this format is invalid, so you might want to fix the input.
Apart from that, it should still not crash. ๐Ÿ˜ƒ

Could you try to edit /usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/encoder/collectd_json_encoder.py like so?

    def encode(self, msg):
        measurements = []

        for line in msg.decode().split("\n"):
            try:
                # Set flag for float precision to get the same
                # results for Python 2 and 3.
                json_object = self.parse_line(line)
            except ValueError as e:
                logging.debug("Error in encoder: %s", e)
                continue
            for entry in json_object:
                try:
                    # people can customize the measurement name, tags much more flexible
                    # to set plugin, plugin_instance as the measurement name, just need to pass ['plugin', 'plugin_instance']
                    measurement = Encoder.format_measurement_name(entry, ['plugin', 'plugin_instance', 'type'])
                    tags = Encoder.format_tags(entry, ['host', 'type_instance'])
                    value = Encoder.format_value(entry)
                    time = Encoder.format_time(entry)
                    measurements.append(Encoder.compose_data(measurement, tags, value, time))
                except Exception as e:
                    logging.debug("Error in input data: %s. Skipping.", e)
                    continue
        return measurements

We could have checked that value is a list here, but in Python it's often more pragmatic to fail early with a try/catch block. (See http://stackoverflow.com/a/7604717/270334)

NOTE: I have not tested this, so be aware of potential bugs.

I know its invalid but crashing is what got me to this discussion. Try catch placement seems right to me. I will give it a try and let you know. Thank you for the active support @mre.

It worked. Another observation on side lines is that the data buffer count do gets increased even when we don't process this bad event and that should be fine.

mre commented

Very sharp observation @meetpradeepp ๐Ÿ‘
I guess it's okay for now that the buffer count is increased, yes.
Thanks for your feedback.