logstash-plugins/logstash-input-kafka

the `record.value.to_s` method prevent codec plain charset settings

Xztty opened this issue · 2 comments

Xztty commented

codec_instance.decode(record.value.to_s) do |event|

Xztty commented

when kafka message‘s value isn't UTF-8 encoding, for example GBK encoding message.
I'd like to set value_deserializer_class => "com.xxx.Latin1StringDeserializer"

The Latin1StringDeserializer implementation:

    public String deserialize(String s, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, "ISO-8859-1");
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding");
        }
    }

I have return a value encoded with "ISO-8859-1" and I want to set input configuration as:

input {
    kafka {
        id => "test_tgk"
        bootstrap_servers => "192.168.1.1:9092"
        topics => ["test_gbk"]
        group_id => "test-gbk"
        value_deserializer_class => "com.xxx.Latin1StringDeserializer"
        codec => plain {
            charset => "GBK"
        }
    }
}

But it won't work, because codec_instance.decode(record.value.to_s) do |event|, the codec decode from the message value string representation instead of value's byte array representation

Xztty commented

It's resolved by this configuration

input {
    kafka {
        id => "test_gbk"
        bootstrap_servers => "192.168.1.1:9092"
        topics => ["test_gbk_"]
        group_id => "cg-gbk"
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        codec => plain {
            charset => "GBK"
        }
    }
}