Kafka module allows to receive events published by other Vert.x verticles and send those events to Kafka broker.
This module requires a Kafka server to be available. See http://kafka.apache.org/documentation.html#quickstart for Kafka setup. You need to have Zookeeper & Kafka servers running. After you have this module integrated into your application, and message has been sent to Kafka using this, you may test the results by creating Kafka consumer in console, which listens to the same topic which you specified in your configuration while deploying the module. For more information how to create Kafka console consumer see: http://kafka.apache.org/documentation.html#quickstart
The module name is kafka.
When deploying this module, you need to provide the following configuration:
{
"address": <address>,
"broker.list": <broker.list>,
"kafka-topic", <kafka-topic>,
"kafka-partition", <kafka-partition>
"request.required.acks": <request.required.acks>,
"serializer.class": <serializer.class>,
}
For example:
{
"address": "test-address",
"broker.list": "localhost:9092",
"kafka-topic", "test-topic",
"kafka-partition", "test-partition",
"request.required.acks": 1,
"serializer.class": "kafka.serializer.DefaultEncoder"
}
The detailed description of each parameter:
address
(mandatory) - The address of Vert.x's EventBus, where the event has been sent by your application in order to be consumed by this module later on.broker.list
(optional) - A comma separated list of Kafka brokers. The format is "host1:port1,host2:port2". Default is:localhost:9092
kafka-topic
(optional) - The name of the topic where you want to send Kafka message. Default is:test-topic
kafka-partition
(optional) - The name of specific partition where to send the Kakfa message. Default is:test-partition
request.required.acks
(optional) - Property to show if Kafka producer needs to wait until the message has been received by Kafka broker. Possible values are: 0, which means that the producer never waits for an acknowledgement from the broker; 1, which means that the producer gets an acknowledgement after the leader replica has received the data; -1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. Default is:1
serializer.class
(optional) - The serializer class for messages. Options arekafka.serializer.DefaultEncoder
andkafka.serializer.StringEncoder
. Thekafka.serializer.DefaultEncoder
is the default option.
If you would like to capture timing information using StatsD you can enable the optional statsd integration. This will make use of the excellent non-blocking java-statsd-client
{
"statsd.enabled": <statsd.enabled defaut:false>,
"statsd.host": <statsd.host default: "localhost">,
"statsd.port": <statsd.port default: 8125>,
"statsd.prefix": <statsd.prefix default: "vertx.kafka">
}
For example:
{
"statsd.enabled": true,
"statsd.host": "localhost",
"statsd.port": 8125,
"statsd.prefix": "myapp.prefix"
}
The detailed description of each parameter:
statsd.enabled
(optional) - Boolean string indicating whether statds logging is enabled. Default is:false
statsd.host
(optional) - Hostname of the statsd server. Default is:localhost
statsd.post
(optional) - Port for the statsd server. Default is:8125
statsd.prefix
(optional) - Prefix for statsd log messages. Default is:vertx.kafka
You can test this module locally, just deploy it in your application specifying necessary configuration. Make sure you have Kafka server running locally on port 9092 (see http://kafka.apache.org/documentation.html#quickstart)
- cd kafka-[version]
- bin/zookeeper-server-start.sh config/zookeeper.properties
- bin/kafka-server-start.sh config/server.properties
Then deploy mod-kafka module in your application like specified below: Example:
JsonObject config = new JsonObject();
config.putString("address", "test-address");
config.putString("broker.list", "localhost:9092");
config.putString("kafka-topic", "test-topic");
config.putString("kafka-partition", "test-partition");
config.putString("request.required.acks", "1");
config.putString("serializer.class", "kafka.serializer.StringEncoder");
container.deployModule("com.zanox~mod-kafka~1.0.0", config);
You can send messages from your application in Vert.x's JsonObject format, where the key must be "payload"
string, and the value can be either byte arrey or string. See below for more details:
For Byte Array type
JsonObject jsonObject = new JsonObject();
jsonObject.putString("payload", "your message goes here".getBytes());
For String type
JsonObject jsonObject = new JsonObject();
jsonObject.putString("payload", "your message goes here");
For this use case you need to explicitly specify the serializer.class
in configuration to have the value "kafka.serializer.StringEncoder".
Then you can verify that you receive those messages in Kafka server by creating consumer via console:
- cd kafka-[version]
- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Now you will see the messages being consumed.
Copyright 2013, ZANOX AG under Apache License. See LICENSE
Author: Mariam Hakobyan
- Fork the repository on Github
- Create a named feature branch
- Develop your changes in a branch
- Write tests for your change (if applicable)
- Ensure all the tests are passing
- Submit a Pull Request using Github