logstash-plugins/logstash-output-kafka

set kafka message timestamp from @timestamp field

acristu opened this issue · 0 comments

When using filebeat kafka output the timestamp of the kafka message has the same value as @timestamp field in the event, with logstash it does not.

Checking the code it seems there is no option to set the timestamp of the ProducerRecord:

record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)

Could we have a new option message_timestamp => "%{[@timestamp]}" that will enable setting of the timestamp? Something like:

  def write_to_kafka(event, serialized_data)
    if @message_key.nil?
      if @message_timestamp.nil?
        record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
      else
        record = ProducerRecord.new(event.sprintf(@topic_id), nil, event.sprintf(@message_timestamp), nil, serialized_data)
      end
    else
      if @message_timestamp.nil?
        record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
      else
        record = ProducerRecord.new(event.sprintf(@topic_id), nil, event.sprintf(@message_timestamp), event.sprintf(@message_key), serialized_data)
      end
    end
. . .