/vertx-kafka

Primary LanguageJavaOtherNOASSERTION

Build Status

Vert.x Kafka

The Vert.x kafka library allows asynchronous publishing and receiving of messages on Kafka topic through the vert.x event bus.

####To use this library you must have kafka and zookeeper up and running. Follow instructions at Kafka quick start guide

This is a multi-threaded worker library that consumes kafka messages and then re-broadcast them on an address on the vert.x event bus.

Getting Started

Add a dependency to vertx-kafka:

<dependency>
    <groupId>com.cyngn.vertx</groupId>
    <artifactId>vertx-kafka</artifactId>
    <version>0.4.0</version>
</dependency>

Consumer

Listening for messages coming from a kafka broker.

Configuration

{
    "zookeeper.connect" : "<host1:2181,host:2181...>",
    "group.id" : "<kafkaConsumerGroupId>",
    "bootstrap.servers" "<host1:9092,host2:9092...>",
    "backoff.increment.ms" : "<backTimeInMilli>",
    "autooffset.reset" : "<kafkaAutoOffset>",
    "topics" : ["<topic1>", "<topic2>"],
    "eventbus.address" : "<default kafka.message.consumer>",
    "consumer.poll.interval.ms" : <default 100 ms>
}

For example:

{
    "zookeeper.host" : "localhost:2181",
    "group.id" : "testGroup",
    "bootstrap.servers" "localhost:9092",
    "backoff.increment.ms" : "100",
    "autooffset.reset" : "smallest",
    "topics" : ["testTopic"],
    "eventbus.address" : "kafka.to.vertx.bridge",
    "consumer.poll.interval.ms" : 1000
}

Field breakdown:

  • zookeeper.connect a zookeeper connection string of form hostname1:port1,hostname2:port2,hostname3:port3/chroot/path used with your kafka clusters
  • group.id the kafka consumer group name that will be consuming related to
  • bootstrap.servers the list of initial kafka hosts to connect to
  • backoff.increment.ms backoff interval for contacting broker without messages in milliseconds
  • autooffset.reset how to reset the offset
  • topics the kafka topics to listen for
  • eventbus.address the vert.x address to publish messages onto when received form kafka
  • consumer.poll.interval.ms how often to try and consume messages

For a deeper look at kafka configuration parameters check this page out.

Usage

You should only need one consumer per application.

Deploy the verticle in your server

vertx = Vertx.vertx();

// sample config
JsonObject consumerConfig = new JsonObject();
consumerConfig.put(ConfigConstants.GROUP_ID, "testGroup");
List<String> topics = new ArrayList<>();
topics.add("testTopic");
consumerConfig.put("topics", new JsonArray(topics));

deployKafka(config);

public void deployKafka(JsonObject config) {
   // use your vert.x reference to deploy the consumer verticle
	 vertx.deployVerticle(MessageConsumer.class.getName(),
      new DeploymentOptions().setConfig(config),
			deploy -> {
   		     if(deploy.failed()) {
        	     System.err.println(String.format("Failed to start kafka consumer verticle, ex: %s", deploy.cause()));
               vertx.close()
               return;
            }
            System.out.println("kafka consumer verticle started");
      }
	);
}

Listen for messages

vertx.eventBus().consumer(MessageConsumer.EVENTBUS_DEFAULT_ADDRESS,
	message -> {
		   System.out.println(String.format("got message: %s", message.body()))
		   // message handling code
		   KafkaEvent event = new KafkaEvent(message.body());
	 });

Consumer Errors

You can listen on the address kafka.producer.error for errors from the kafka producer.

Producer

Send a message to a kafka cluster on a predefined topic.

Configuration

{
    "serializer.class":"<the default encoder>",
    "key.serializer":"<the key encoder>",
    "value.serializer":"<the value encoder>",
    "bootstrap.servers":"<host1:9092,host2:9092>,"
    "default_topic":"<default kafka topic to send to>,"
    "eventbus.address":"<the event bus topic where you send messages to send to kafka>"
    "max.block.ms" : <defaults to 60000>
}

For example:

{
    "serializer.class":"org.apache.kafka.common.serialization.StringSerializer",
    "bootstrap.servers":"localhost:9092",
    "default_topic":"testTopic"
}
  • serializer.class The serializer class for messages
  • key.serializer The serializer class for keys, defaults to the serializel.class if not set
  • value.serializer The serializer class for values, defaults to the serializel.class if not set
  • bootstrap.servers The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
  • default_topic The default topic in kafka to send to
  • eventbus.address The address to listen to on the event bus, defaults to 'kafka.message.publisher'
  • max.block.ms How long should the sender wait before getting meta data or time out in ms.

For a deeper look at kafka configuration parameters check this page out.

Usage

You should only need one producer per application.

Deploy the verticle in your server

vertx = Vertx.vertx();

// sample config
JsonObject producerConfig = new JsonObject();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("serializer.class", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("default_topic", "testTopic");

deployKafka(producerConfig);

public void deployKafka(JsonObject config) {
   // use your vert.x reference to deploy the consumer verticle
	 vertx.deployVerticle(MessageProducer.class.getName(),
     new DeploymentOptions().setConfig(config),
		 deploy -> {
   	   if(deploy.failed()) {
         System.err.println(String.format("Failed to start kafka producer verticle, ex: %s", deploy.cause()));
          vertx.close()
          return;
       }
       System.out.println("kafka producer verticle started");
   });
}

Send message to kafka topic

KafkaPublisher publisher = new KafkaPublisher(vertx.eventBus());

// send to the default topic
publisher.send("a test message on a default topic");
// send to a specific topic
publisher.send("SomeSpecialTopic", "a test message on a default topic");
// send to a specific topic with custom key
publisher.send("SomeSpecialTopic", "aUserId", "a test message on a default topic");
// send to a specific topic and partition
publisher.send("SomeSpecialTopic", "", 5, "a test message on a default topic");

Producer Errors

You can listen on the address kafka.producer.error for errors from the kafka producer.

Test setup

  • cd [yourKafkaInstallDir]
  • bin/zookeeper-server-start.sh config/zookeeper.properties
  • bin/kafka-server-start.sh config/server.properties
  • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 8 --topic [yourTestTopic]
  • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic [yourTestTopic]
  • bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic [yourTestTopic]