Unmatched data kills the process
meetpradeepp opened this issue ยท 6 comments
Hello @mre, Thank you for brilliant code. I was able to get it for testing with no efforts.
I wanted to know your thoughts on input not matching to encoder settings. Should it die like it does now or possibly just print something?
It should never die ๐
Could you attach some example input, the kafka-influxdb settings and the terminal output when it dies?
This would make testing so much easier.
If you do so, please make sure to anonymize the data if needed.
I agree :). Here is the output I sent which broke collectd encoder. I removed [] around values. First entry of flushing worked with correct value but malformed or unmatched json crashed it.
-- KAFKA INPUT --
[{"values":0,"dstypes":["derive"],"dsnames":["value"],"time":1436372292.412,"interval":10.000,"host":"26f2fc918f50","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"interrupt"}]
--- OUTPUT -----
Reading config file kafka_influxdb_conf.yaml
Flushing output buffer. 0.01 messages/s
Traceback (most recent call last):
File "/usr/bin/kafka_influxdb", line 9, in
load_entry_point('kafka-influxdb==0.7.2', 'console_scripts', 'kafka_influxdb')()
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/main.py", line 31, in main
start_consumer(config)
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/main.py", line 47, in start_consumer
client.consume()
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/worker.py", line 29, in consume
self.buffer.extend(self.encoder.encode(raw_message))
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/encoder/collectd_json_encoder.py", line 51, in encode
value = Encoder.format_value(entry)
File "/usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/encoder/collectd_json_encoder.py", line 95, in format_value
if len(values) == 1:
TypeError: object of type 'int' has no len()
So, quick guess:
It looks like values
in your input should be a list.
Like so:
[
{
"values":[
0
],
"dstypes":[
"derive"
],
"dsnames":[
"value"
],
"time":1436372292.412,
"interval":10.000,
"host":"26f2fc918f50",
"plugin":"cpu",
"plugin_instance":"1",
"type":"cpu",
"type_instance":"interrupt"
}
]
I guess this format is invalid, so you might want to fix the input.
Apart from that, it should still not crash. ๐
Could you try to edit /usr/lib/python2.7/site-packages/kafka_influxdb-0.7.2-py2.7.egg/kafka_influxdb/encoder/collectd_json_encoder.py
like so?
def encode(self, msg):
measurements = []
for line in msg.decode().split("\n"):
try:
# Set flag for float precision to get the same
# results for Python 2 and 3.
json_object = self.parse_line(line)
except ValueError as e:
logging.debug("Error in encoder: %s", e)
continue
for entry in json_object:
try:
# people can customize the measurement name, tags much more flexible
# to set plugin, plugin_instance as the measurement name, just need to pass ['plugin', 'plugin_instance']
measurement = Encoder.format_measurement_name(entry, ['plugin', 'plugin_instance', 'type'])
tags = Encoder.format_tags(entry, ['host', 'type_instance'])
value = Encoder.format_value(entry)
time = Encoder.format_time(entry)
measurements.append(Encoder.compose_data(measurement, tags, value, time))
except Exception as e:
logging.debug("Error in input data: %s. Skipping.", e)
continue
return measurements
We could have checked that value is a list here, but in Python it's often more pragmatic to fail early with a try/catch block. (See http://stackoverflow.com/a/7604717/270334)
NOTE: I have not tested this, so be aware of potential bugs.
I know its invalid but crashing is what got me to this discussion. Try catch placement seems right to me. I will give it a try and let you know. Thank you for the active support @mre.
It worked. Another observation on side lines is that the data buffer count do gets increased even when we don't process this bad event and that should be fine.
Very sharp observation @meetpradeepp ๐
I guess it's okay for now that the buffer count is increased, yes.
Thanks for your feedback.