Read Kafka headers
ahammani opened this issue · 11 comments
Hello,
Use Case
I send Kafka messages with headers like a correlation ID.
Alongside the data I want to visualize these headers in kibana.
Current state
After reading https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
I figured out this logstash.conf
:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["topic1", "topic2"]
auto_offset_reset => "earliest"
group_id => "logstash-local"
decorate_events => true
}
}
output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "kafka-logstash-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
filter {
mutate {
add_field => {
topic => "%{[@metadata][kafka][topic]}"
}
}
json {
skip_on_invalid_json => true
tag_on_failure => ["_json_parse_failure"]
source => "message"
target => "kafka_payload"
}
}
The kafka meta are properly displayed in kibana. But now, i want to display headers as well.
Can you point me to some documentation i missed or is this a feature to come ?
Used version
- ELK version: 6.3.0
- Kafka version: 1.0.0
- Operating System: Ubuntu 16.04
Many thanks
Hi,
Are there any plans to support this?
Do you fix it?
Could you make a example for logstash like sending data to elasticsearch from kafka,thanks a lot.
I think
No Support Kafka header !
change custom code ruby in kafka intregration plugin new version
@ahammani Were you able to get a solutions for this?
I was trying to handle it through mappings settings to get the headers info into the index fields in elk unfortunately it is not helping or didnt work. Was there any solution you found for it?
ELK Version: 7.6.2
something like below in the index template:
{
"order": 2,
"index_patterns": [
"kafka-topic-*"
],
"settings": {
"index": {
"lifecycle": {
"name": "kafka_topic_policy"
},
"refresh_interval": "5s",
"number_of_shards": "1",
"max_docvalue_fields_search": "200"
}
},
"mappings": {
"_doc": {
"_source": {
"excludes": [],
"includes": [],
"enabled": true
},
"_meta": {},
"_routing": {
"required": false
},
"dynamic": true,
"numeric_detection": false,
"date_detection": true,
"dynamic_date_formats": [
"yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
],
"dynamic_templates": [],
"properties": {
"@timestamp": {
"format": "epoch_millis||strict_date_optional_time",
"index": true,
"ignore_malformed": false,
"store": false,
"type": "date",
"doc_values": true
},
"headers": {
"type": "object",
"properties": {
"error_code": {
"type": "integer"
},
"error_message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
},
"message": {
"type": "text"
},
"topic": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
}
Ok i understand slove. I done slove kafka header with logstash success but I going sleep . Tommorow i response to this slove
Just found out that it's here as part of logstash-integration-kafka
Thanks for pointing that out @weizhu-us. Closing this issue, as it has been implemented in recent versions of the logstash-integration-kafka
plugin, which is now the home of the logstash input plugin for kafka.
Please create a new issue in the kafka integration repo if there issues with this feature.