mre/kafka-influxdb

Support raw InfluxDB line protocol as an input format

p3r7 opened this issue · 5 comments

p3r7 commented

If replacing collectd with telegraf, we can directly have data in InfluxDB line protocol format.
See: telegraf output formats

It could be quite nice to have an encoder for this use-case, or even kafka-influxdb to have a "transparent mode", with the mashalling disabled.

mre commented

Oh wow, that's a good idea. 😃
One thing that comes to mind is using the echo encoder for that. Did you try using that?

p3r7 commented

I've just tried that, while running kafka-influxdb in -vvv mode.

The highwater_offset seems to get updated and by reading the logs I guess messages get consumed (see this gist).

But I don't see the data in InfluxDB, and no error logs get generated by the later.
I'm using InfluxDB version 1.0.2, with the standard config (error logs enabled).

mre commented

So, yeah it looks like it's reading messages from Kafka (offset is increasing in your output).
There's two things you need to check for InfluxDB, though:

  1. Can you actually ping influxdb? Your log output indicates that the host cannot be resolved.
  2. After checking (and maybe changing) the hostname of InfluxdB, try if the messages are written to InfluxDB. If not, change the echo_encoder like so:
class Encoder(object):
    @staticmethod
    def encode(msg):
        """
        Don't change the message at all
        :param msg:
        """
        return [msg]

(i.e. add brackets around msg because the return value should actually be a list of messages.)
Also changed the code in master now. See here.

p3r7 commented

OK, thanks a lot, I got it to work :)

  1. Indeed, I didn't change the hostname.
    Shouldn't hostname resolution error raise an error ?
  2. This did the trick.

I also had to change the retention_policy to "autogen" (new name for the default retention_policy, instead of "default").

It would be nice to advertise the fact that raw InfluxDB line protocol is supported as a feature on the README :)
A concrete use-case would be applicatives pushing to kafka already formatted InfluxDB insert queries, without having to rely on collectd nor telegraf.

mre commented

About No.1: You're right. That should raise an error.
Changed the default retention policy with b23979d.
Also updated the README to mention telegraf support.