-
programming language
-
Kafka infra build
-
Ref projects
├── KafkaJava : kafka `Java` project
├── KafkaPython : kafka `Python` project
├── Makefile : help Makefile
├── README.md
├── build.sbt : scala build tool (sbt) config
├── config : kafka config (all project level)
├── data : toy data
├── doc : ref doc
├── docker-compose.yml : docker yml build mini dev system (kafka + zookeeper)
├── examples : kafka java project examples by cases
├── mk.d : sub Makefile
├── project : scala build tool (sbt) config
├── script : help bash scripts
├── services : docker yml build dev system by components
├── src : kafka `Scala` project (source code)
Prerequisites
- env
- Java JDK 1.8
- Kafka
- Zookeeper
- sbt 1.3.12
- Scala
- IntelliJ
# install Java, kafka, zookeeper
brew install kafka
brew install zookeeper
# start zookeeper, kafka
brew services start zookeeper
brew services start kafka
# restart zookeeper, kafka
brew services restart zookeeper
brew services restart kafka
# stop zookeeper, kafka
brew services stop zookeeper
brew services stop kafka
Build
# sbt clean compile
sbt compile
sbt assembly
# the build sbt jar should exist below
# /target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar
Run
- SimpleProducerConsumer
# producer
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar SimpleProducerConsumer.Producer
# consumer
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar SimpleProducerConsumer.Consumer
- WordCount
# KafkaStream - wordcount
# create a topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic text_lines
# make toy data
echo -e "doo dooey do dodah\ndoo dooey do dodah\ndoo dooey do dodah \n 123 456 123" > data/words.txt
# run the kafkastream workcount script
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar KafkaStream.WordCount
# send it to kafka
cat data/words.txt | kafka-console-producer --broker-list localhost:9092 --topic text_lines
# check the output
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic word_count_results \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
- ProducerConsumerPartitioner
# create the topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_ProducerConsumerPartitioner
# run the consumer
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar ProducerConsumerPartitioner.Consumer
# run the producer
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar ProducerConsumerPartitioner.Producer
# check the result
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic topic_ProducerConsumerPartitioner \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true
- AsyncProducerConsumer
# create the topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_AsyncProducerConsumer
# run the consumer
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar AsyncProducerConsumer.ConsumerRunner
# run the producer
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar AsyncProducerConsumer.ProducerRunner
# check the result
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic topic_AsyncProducerConsumer \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true
Qucik start
# run per script
# producer 1
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar Producer.KafkaProducerApp
# producer 2
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar Producer.KafkaProducerApple
# producer 3
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar Producer.KafkaProducerApp2
# producer 4
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar Producer.KafkaProducerApp3
# run per script
# consumer 1
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar Consumer.KafkaConsumerSubscribeApp
# consumer 2
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar Consumer.ScalaConsumerExample
# consumer 3
java -cp target/scala-2.11/KafkaHelloWorld-assembly-1.0.jar Consumer.KafkaConsumerApp2
# create kafka topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic text_topic
# set up producer
kafka-console-producer --broker-list 127.0.0.1:9092 --topic text_topic --producer-property acks=all
# set up cosumer
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic text_topic
# sbt compile
sbt clean compile
# sbt run
sbt run
# run KafkaProducerApp : create event via kafka producer
# [1] Consumer.KafkaConsumerSubscribeApp
# [2] Producer.KafkaProducerApp
# run KafkaConsumerSubscribeApp : collect event via Kafka Consumer
# [1] Consumer.KafkaConsumerSubscribeApp
# [2] Producer.KafkaProducerApp
# send file as kafka stream
# run
# 1) launch consumer
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic orders
# 2) send stream
bash script/streamOrders.sh
# install python client library
pip3 install -r requirements.txt
# produce event
python src/main/python/producer_demo.py
# consume event
python src/main/python/consumer_demo.py
Test
sbt test
Development
-
Git flow
-
dev branch -> master branch
-
Please create the branch as below format
feature/0001-create-first-feature
fix/0001-fix-first-issue
hotfix/fix-critical-errors
- ...
-
Step 1
- create branch
-
Step 2
- make a PR
-
Step 3
- merge to master
-
Kafka location
/usr/local/etc/kafka
-
zookeeper location
/usr/local/etc/zookeeper
Ref
-
Confluent kafka official tutorial
-
https://sparkbyexamples.com/kafka/apache-kafka-consumer-producer-in-scala/
-
project dependency
-
Kafka with spark-streaming
-
Kafaka schema management
-
Makefile ref
-
Kafka Docker