Kafka River Plugin for ElasticSearch
The Kafka River plugin allows you to read messages from Kafka and index bulked messages into elasticsearch. The bulk size (the number of messages to be indexed in one request) and concurrent request number is configurable. The Kafka River also supports consuming messages from multiple Kafka brokers and multiple partitions.
The plugin uses the latest Kafka and Elasticsearch version.
- Kafka version 0.8.1.1
- Elasticsearch version 1.4.0
The plugin is periodically updated, if there are newer versions of any dependencies.
Setup
-
Install Kafka if you are working on local environment (See Apacke Kafka Quick Start Guide for instructions on how to Download and Build.)
-
Install the plugin
cd $ELASTICSEARCH_HOME
.bin/plugin -install <plugin-name> -url https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/download/v1.1.0/elasticsearch-river-kafka-1.1.0-plugin.zip
Example:
cd $ELASTICSEARCH_HOME
.bin/plugin -install kafka-river -url https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/download/v1.1.0/elasticsearch-river-kafka-1.1.0-plugin.zip
If it doesn't work, clone git repository and build plugin manually.
- Build the plugin - it will create a zip file here: $PROJECT-PATH/target/elasticsearch-river-kafka-1.1.0-SNAPSHOT-plugin.zip
- Install the plugin from target into elasticsearch
cd $ELASTICSEARCH_HOME
.bin/plugin --install <plugin-name> --url file:////$PLUGIN-PATH/elasticsearch-river-kafka-1.1.0-SNAPSHOT-plugin.zip
Update installed plugin
cd $ELASTICSEARCH_HOME
./bin/plugin -remove <plugin-name>
./bin/plugin -url file:/$PLUGIN_PATH -install <plugin-name>
Configuration
To deploy Kafka river into Elasticsearch as a plugin, execute:
curl -XPUT 'localhost:9200/_river/<river-name>/_meta' -d '
{
"type" : "kafka",
"kafka" : {
"zookeeper.connect" : <zookeeper.connect>,
"zookeeper.connection.timeout.ms" : <zookeeper.connection.timeout.ms>,
"topic" : <topic-name>
},
"index" : {
"index" : <index-name>,
"type" : <mapping-type-name>,
"bulk.size" : <bulk.size>,
"concurrent.requests" : <concurrent.requests>
}
}'
- NOTE: Type "kafka" is required and must not be changed. It corresponds the type, given in the source code, by which elasticsearch is able to associate created river with the installed plugin.
Example:
curl -XPUT 'localhost:9200/_river/kafka-river/_meta' -d '
{
"type" : "kafka",
"kafka" : {
"zookeeper.connect" : "localhost",
"zookeeper.connection.timeout.ms" : 10000,
"topic" : "river"
},
"index" : {
"index" : "kafka-index",
"type" : "status",
"bulk.size" : 100,
"concurrent.requests" : 1
}
}'
The detailed description of each parameter:
river-name
(required) - The river name to be created in elasticsearch.zookeeper.connect
(optional) - Zookeeper server host. Default is:localhost
zookeeper.connection.timeout.ms
(optional) - Zookeeper server connection timeout in milliseconds. Default is:10000
topic
(optional) - The name of the topic where you want to send Kafka message. Default is:elasticsearch-river-kafka
index
(optional) - The name of elasticsearch index. Default is:kafka-index
type
(optional) - The mapping type of elasticsearch index. Default is:status
bulk.size
(optional) - The number of messages to be bulk indexed into elasticsearch. Default is:100
concurrent.requests
(optional) - The number of concurrent requests of indexing that will be alowed. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. Default is:1
Flush interval is set to 12 hours by default, so any remaining messages get flushed to elasticsearch even if the number of messages has not reached.
To delete the existing river, execute:
curl -XDELETE 'localhost:9200/_river/<river-name>/'
Example:
curl -XDELETE 'localhost:9200/_river/kafka-river/'
To see the indexed data:
curl -XGET 'localhost:9200/kafka-index/_search?pretty=1'
To delete the index:
curl -XDELETE 'localhost:9200/kafka-index'
Kafka Consumer details
Currently Consumer Group Java API (high level api) is used to create the Kafka Consumer, which keeps track of offset automatically. This enables the River to read kafka messages from multiple brokers and multiple partitions.
License
Copyright (C) 2014 Mariam Hakobyan. See LICENSE
Contributing
- Fork the repository on Github
- Create a named feature branch
- Develop your changes in a branch
- Write tests for your change (if applicable)
- Ensure all the tests are passing
- Submit a Pull Request using Github