This example implements a Kafka producer and consumer that use a Avro schema mapper to communicate data in the topic.
- Kafka 0.9.0
- Maven 3
- Java 7/8
kafka$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
kafka$ ./bin/kafka-server-start.sh config/server.properties
kafka$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic example.test.topic
project$ mvn clean install
String topicName = "example.test.topic";
// setup schema
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(AvroSchemaDefinitionLoader.fromFile("schema/tweet.avro").get());
// setup the mapper
DataMapper<GenericRecord> mapper = new AvroDataMapper(schema);
// setup consumer
DataProcessor<GenericRecord> processor = new AvroDataProcessor();
KafkaLibConsumer<GenericRecord> consumer = new KafkaLibConsumer<>(
topicName,
PropertiesLoader.fromFile("consumer.properties"),
mapper,
processor);
// set up the producer
KafkaLibProducer<GenericRecord> producer = new KafkaLibProducer<>(
topicName,
PropertiesLoader.fromFile("producer.properties"),
mapper);
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("username", "hugopicado");
avroRecord.put("tweet", "my #awesome tweet");
producer.produce(avroRecord);
consumer.consume();