set kafka message timestamp from @timestamp field
acristu opened this issue · 0 comments
acristu commented
When using filebeat kafka output the timestamp of the kafka message has the same value as @timestamp field in the event, with logstash it does not.
Checking the code it seems there is no option to set the timestamp of the ProducerRecord:
Could we have a new option message_timestamp => "%{[@timestamp]}"
that will enable setting of the timestamp? Something like:
def write_to_kafka(event, serialized_data)
if @message_key.nil?
if @message_timestamp.nil?
record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
else
record = ProducerRecord.new(event.sprintf(@topic_id), nil, event.sprintf(@message_timestamp), nil, serialized_data)
end
else
if @message_timestamp.nil?
record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
else
record = ProducerRecord.new(event.sprintf(@topic_id), nil, event.sprintf(@message_timestamp), event.sprintf(@message_key), serialized_data)
end
end
. . .