logstash-plugins/logstash-input-kafka

Read Kafka headers

ahammani opened this issue · 11 comments

Hello,

Use Case

I send Kafka messages with headers like a correlation ID.
Alongside the data I want to visualize these headers in kibana.

Current state

After reading https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
I figured out this logstash.conf :

input {
        kafka {
                bootstrap_servers => "localhost:9092"
                topics => ["topic1", "topic2"]
                auto_offset_reset => "earliest"
                group_id => "logstash-local"
                decorate_events => true
        }
}
output {
        elasticsearch {
                hosts => "localhost:9200"
                manage_template => false
                index => "kafka-logstash-%{+YYYY.MM.dd}"
                document_type => "%{[@metadata][type]}"
        }
}
 
filter {
        mutate {
                add_field => { 
                        topic => "%{[@metadata][kafka][topic]}"
                }
        }
        json {
                skip_on_invalid_json => true
                tag_on_failure => ["_json_parse_failure"]
                source => "message"
                target => "kafka_payload"
        }
}

The kafka meta are properly displayed in kibana. But now, i want to display headers as well.

Can you point me to some documentation i missed or is this a feature to come ?

Used version

  • ELK version: 6.3.0
  • Kafka version: 1.0.0
  • Operating System: Ubuntu 16.04

Many thanks

Hi,

Are there any plans to support this?

Do you fix it?
Could you make a example for logstash like sending data to elasticsearch from kafka,thanks a lot.

I think
No Support Kafka header !

change custom code ruby in kafka intregration plugin new version

@ahammani Were you able to get a solutions for this?

I was trying to handle it through mappings settings to get the headers info into the index fields in elk unfortunately it is not helping or didnt work. Was there any solution you found for it?

ELK Version: 7.6.2

something like below in the index template:

{
"order": 2,
"index_patterns": [
"kafka-topic-*"
],
"settings": {
"index": {
"lifecycle": {
"name": "kafka_topic_policy"
},
"refresh_interval": "5s",
"number_of_shards": "1",
"max_docvalue_fields_search": "200"
}
},
"mappings": {
"_doc": {
"_source": {
"excludes": [],
"includes": [],
"enabled": true
},
"_meta": {},
"_routing": {
"required": false
},
"dynamic": true,
"numeric_detection": false,
"date_detection": true,
"dynamic_date_formats": [
"yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
],
"dynamic_templates": [],
"properties": {
"@timestamp": {
"format": "epoch_millis||strict_date_optional_time",
"index": true,
"ignore_malformed": false,
"store": false,
"type": "date",
"doc_values": true
},
"headers": {
"type": "object",
"properties": {
"error_code": {
"type": "integer"
},
"error_message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"

}
}
}
}
},
"message": {
"type": "text"
},
"topic": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
}

Ok i understand slove. I done slove kafka header with logstash success but I going sleep . Tommorow i response to this slove

Sorry ,i forgot send code

20210127_122237.jpg20210127_122225.jpg

Sorry ,i forgot send code

20210127_122237.jpg20210127_122225.jpg

it is code for solved it .But what can i do rather fix it by use logstash config not code.THX

Just found out that it's here as part of logstash-integration-kafka

logstash-plugins/logstash-integration-kafka#78

Thanks for pointing that out @weizhu-us. Closing this issue, as it has been implemented in recent versions of the logstash-integration-kafka plugin, which is now the home of the logstash input plugin for kafka.

Please create a new issue in the kafka integration repo if there issues with this feature.