/SimpleKafkaApp

Simple App to Publish messages to a single and a multi partition cluster

Primary LanguageJavaGNU Lesser General Public License v2.1LGPL-2.1

SimpleKafkaApp

Prerequisites:

  1. Basic knowledge of docker and how to use docker-compose.yml file
  2. Install any IDE. I've used IntelliJ
  3. Install docker and docker-compose on your device
  4. Install XCode for setting up this repository on your machine

Setup

  1. Clone this Repository on your local system
  2. Assuming you are connected to a Mac System on a Secure WiFi connection, find the IP Address of your Machine using this command:
localmachine % ipconfig getifaddr en0
10.0.0.78
  1. Replace localhost with the above Local IP Address in the following files:
a. docker-compose.yml
b. src/main/java/SampleProducer.java
c. src/main/java/ScaledProducer.java
d. src/main/python/pyspark_consumer.py
  1. Open the Project in IntelliJ and Build it without failures
  2. In Terminal, Create the Docker containers for the zookeeper, kafka, spark, sparkworker
localmachine % docker-compose -f docker-compose.yml up
  1. Now check the Docker Dashboard to check the containers that are created during the process

This can also be confirmed using the `docker ps` command
localmachine % docker ps
CONTAINER ID   IMAGE                    COMMAND                  CREATED         STATUS         PORTS                                                NAMES
19cd6810214e   bitnami/spark:3.3        "/opt/bitnami/script…"   6 minutes ago   Up 6 minutes                                                        sparkworker
ff51a6dc9119   bitnami/spark:3.3        "/opt/bitnami/script…"   6 minutes ago   Up 6 minutes   0.0.0.0:8989->8989/tcp                               spark
3711aa2bc6cd   wurstmeister/kafka       "start-kafka.sh"         6 minutes ago   Up 6 minutes   0.0.0.0:9092->9092/tcp                               kafka
fc65a3bd7b3a   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   6 minutes ago   Up 6 minutes   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper
  1. Now, we are ready to proceed to the implementation

Implementation

SampleProducer.java

Login in to the kafka container from our local machine:

localmachine % docker exec -it kafka /bin/sh

Once there, navigate to the /opt/kafka/bin directory. It houses the scripts for Kafka Basic commands

# cd /opt/kafka/bin

There we can see all the topics mentioned in the KAFKA_CREATE_TOPICS tag in docker-compose.yml:

# ./kafka-topics.sh --list --zookeeper zookeeper:2181
mychannel1
topic1

We can initiate the kafka consumer to listen to mychannel1 topic:

# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mychannel1 --from-beginning

Add the configurations in IntelliJ to execute the SampleProducer.java App

Now run the Application.
In the Terminal Window where Kafka Consumer is listening to the mychannel1 Topic:

# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mychannel1 --from-beginning
ironman
thor
hulk
spiderman
blackwidow
gamura

This completes the Part 1. Now we want to consume this data using PySpark.
Lets login to the Spark Container from our local machine:

localmachine % docker exec -it spark /bin/sh

While starting Pyspark, include the packages for kafka Streaming Integration in Pyspark Structured Streaming

$ pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1

Next, execute the commands given in the sample_pyspark_consumer.py:

final_df.show()   # Gives us
+----+----------+                                                               
| key|     value|
+----+----------+
|name|   ironman|
|name|      thor|
|name|      hulk|
|name| spiderman|
|name|blackwidow|
|name|    gamura|
+----+----------+

ScaledProducer.java

We follow the similar process, except that we run the ScaledProducer.java now.
Configurations for Running ScaledProducer are as follows:

And, in pyspark, we run the code in scaled_pyspark_consumer.py

Wrapping up

It's a good practice to gracefully finishing what you started.
We can stop our Docker containers from our local machine by:

localmachine % docker-compose -f docker-compose.yml down 
[+] Running 5/4
 ⠿ Container sparkworker           Removed                    10.2s
 ⠿ Container spark                 Removed                    10.2s
 ⠿ Container zookeeper             Removed                    10.2s
 ⠿ Container kafka                 Removed                     4.3s
 ⠿ Network simplekafkaapp_default  Removed                     0.1s

After this check the status of current docker containers (assuming no other containers were running):

localmachine % docker ps -a
CONTAINER ID   IMAGE     COMMAND   CREATED   STATUS    PORTS     NAMES

Happy Data Pipelining!!!