/kafka-examples-1

Examples with Apache Kafka

Primary LanguageJava

Apache Kafka examples

Table of Contents

Installation

Apache Kafka installation

Details about the installation, info at:
https://kafka.apache.org/documentation/#quickstart

Apache Kafka using docker compose

To run a kafka cluster with Docker, use the docker-compose.yml file available in the root directory.

Images are downloaded from confluentinc and are based on Confluent 7.4.0 Community licensed (kafka version 3.4.0):

  • Zookeeper: confluentinc/cp-zookeeper

  • Kafka: confluentinc/cp-kafka

  • Schema Registry: confluentinc/cp-schema-registry

  • Connect: custom image based on confluentinc/cp-kafka-connect-base

  • ksqlDB server: confluentinc/ksqldb-server:0.28.2

  • ksqlDB cli: confluentinc/ksqldb-cli:0.28.2

Components list:

  • Broker will listen to localhost:9092

  • Schema Registry will listen to localhost:8081

  • Connect will listen to localhost:8083

  • ksqlDB cli will listen to localhost:8088

Components

Start containers

scripts/bootstrap.sh

Stop containers

scripts/tear-down.sh

Change Images Version

Change the docker image version for the specific component, updating file .env in root directory.

Apache Kafka on Kubernetes using Confluent For Kubernetes

To run a kafka cluster on Kubernetes, have a look at Confluent for Kubernetes operator (CFK).

Example of bootstrapping a cluster using CFK:

Kafka producers

Some implementations of kafka producers.

To launch the examples run kafka on port 9092:

scripts/bootstrap.sh

StringSerializer

It uses org.apache.kafka.common.serialization.StringSerializer for key and value

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.string.Runner"

JsonSerializer

It uses org.apache.kafka.common.serialization.StringSerializer for key and a org.hifly.kafka.demo.producer.serializer.json.JsonSerializer for value

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.json.Runner"

Transactional Producer

It uses org.apache.kafka.common.serialization.StringSerializer for key and value and set enable.idempoteceny and transactional.id

Create a topic with 3 partitions:

kafka-topics --bootstrap-server localhost:9092 --create --topic test-idempotent --replication-factor 1 --partitions 3
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.tx.Runner"

Avro-confluent

It uses io.confluent.kafka.serializers.KafkaAvroSerializer for value and a GenericRecord.

Confluent schema registry is needed to run the example.

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.avro.Runner" -Dexec.args="CONFLUENT"

Avro-apicurio

It uses io.apicurio.registry.utils.serde.AvroKafkaSerializer for value and a GenericRecord.

Apicurio schema registry is needed to run the example.

Start Apicurio:

./scripts/bootstrap-apicurio.sh
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.avro.Runner" -Dexec.args="APICURIO"

Teardown:

./scripts/teardown-apicurio.sh

Partitioner

It uses a custom partitioner for keys.

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.partitioner.custom.Runner"

Execute tests:

cd kafka-producer
mvn clean test

Interceptor

This example shows how to create a custom producer interceptor. class CreditCardProducerInterceptor will mask a sensitive info on producer record (credit card).

Compile and package:

cd interceptors
mvn clean package

Run a kafka producer on listener port 9092:

mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.producer.Runner"

Run a kafka consumer on listener port 9092:

mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.consumer.Runner"

Verify output:

record is:XXXXXX
Topic: test_custom_data - Partition: 0 - Offset: 1

Teardown:

scripts/tear-down.sh

Python Producer

Install python lib confluent-kafka:

pip install confluent-kafka

Create topic:

kafka-topics --bootstrap-server localhost:9092 --create --topic kafka-topic --replication-factor 1 --partitions 1

Run producer:

cd kafka-python-producer
python producer.py

Kafka consumers

Implementation of a kafka consumer that can be used with different deserializer classes (for key and value).

Class org.hifly.kafka.demo.consumer.deserializer.impl.ConsumerInstance can be customized with:

  • clientId (string)

  • groupId (string)

  • topics (string separated by comma)

  • key deserializer class (string)

  • value deserializer class (string)

  • partition assignment strategy (org.apache.kafka.clients.consumer.RangeAssignor|org.apache.kafka.clients.consumer.RoundRobinAssignor|org.apache.kafka.clients.consumer.StickyAssignor|org.apache.kafka.clients.consumer.CooperativeStickyAssignor)

  • isolation.level (read_uncommitted|read_committed)

  • poll timeout (ms)

  • consume duration (ms)

  • autoCommit (true|false)

  • commit sync (true|false)

  • subscribe mode (true|false)

Topics can be passed as argument 1 of the main program:

-Dexec.args="users,users_clicks"

Partition assignment strategy can be passed as argument 2 of the main program:

-Dexec.args="users,users_clicks org.apache.kafka.clients.consumer.RoundRobinAssignor"

Execute tests:

cd kafka-consumer
mvn clean test

To launch the examples run kafka on port 9092:

scripts/bootstrap.sh

StringDeserializer

It uses org.apache.kafka.common.serialization.StringDeserializer for key and value. Default topic is topic1.

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner"

Send messages to the topic:

kafka-console-producer --broker-list localhost:9092 --topic topic1 --property "parse.key=true" --property "key.separator=:"
> Frank:1

read committed

It uses org.apache.kafka.common.serialization.StringDeserializer for key and value and set isolation.level to read_committed.

It must be only used with a transactional producer.

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.tx.Runner"

Consumer Partition Assignor

Range (default)

Create 2 topics with same number of partitions:

kafka-topics --bootstrap-server localhost:9092 --create --topic users --replication-factor 1 --partitions 3
kafka-topics --bootstrap-server localhost:9092 --create --topic users_clicks --replication-factor 1 --partitions 3

Run 2 consumer instances (2 different shell/terminal) belonging to the same group and subscribed to user and user_clicks topics; consumers uses org.apache.kafka.clients.consumer.RangeAssignor to distribute partition ownership.

mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner" -Dexec.args="users,users_clicks"

Send messages to the topics using the same key (Frank):

kafka-console-producer --broker-list localhost:9092 --topic users --property "parse.key=true" --property "key.separator=:"
> Frank:1
kafka-console-producer --broker-list localhost:9092 --topic users_clicks --property "parse.key=true" --property "key.separator=:"
> Frank:1

Verify that the same consumer instance will read both messages.

Group id group-XX - Consumer id: consumer-group-XX-1-421db3e2-6501-45b1-acfd-275ce8d18368 - Topic: users - Partition: 1 - Offset: 0 - Key: frank - Value: 1
Group id group-XX - Consumer id: consumer-group-XX-1-421db3e2-6501-45b1-acfd-275ce8d18368 - Topic: users_clicks - Partition: 1 - Offset: 0 - Key: frank - Value: 1

Round Robin

Create 2 topics with same number of partitions:

kafka-topics --bootstrap-server localhost:9092 --create --topic users --replication-factor 1 --partitions 3
kafka-topics --bootstrap-server localhost:9092 --create --topic users_clicks --replication-factor 1 --partitions 3

Run 2 consumer instances (2 different shell/terminal) belonging to the same group and subscribed to user and user_clicks topics; consumers uses org.apache.kafka.clients.consumer.RoundRobinAssignor to distribute partition ownership.

mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner" -Dexec.args="users,users_clicks org.apache.kafka.clients.consumer.RoundRobinAssignor"

Send messages to the topics using the same key (Frank):

kafka-console-producer --broker-list localhost:9092 --topic users --property "parse.key=true" --property "key.separator=:"
> Frank:1
kafka-console-producer --broker-list localhost:9092 --topic users_clicks --property "parse.key=true" --property "key.separator=:"
> Frank:1

Verify that messages are read by different consumers.

Interceptor

This example shows how to create a custom consumer interceptor. class CreditCardConsumerInterceptor will intercept records before deserialization and print headers.

Compile and package:

cd interceptors
mvn clean package

Run a kafka producer on listener port 9092:

mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.producer.Runner"

Run a kafka consumer on listener port 9092:

mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.consumer.Runner"

Verify output:

record headers:RecordHeaders(headers = [], isReadOnly = false)
Group id consumer-interceptor-g2 - Consumer id: consumer-consumer-interceptor-g2-1-0e20b2b6-3269-4bc5-bfdb-ca787cf68aa8 - Topic: test_custom_data - Partition: 0 - Offset: 0 - Key: null - Value: XXXXXX
Consumer 23d06b51-5780-4efc-9c33-a93b3caa3b48 - partition 0 - lastOffset 1

Teardown:

scripts/tear-down.sh

Python Consumer

Install python lib confluent-kafka:

pip install confluent-kafka

Create topic:

kafka-topics --bootstrap-server localhost:9092 --create --topic kafka-topic --replication-factor 1 --partitions 1

Run producer:

cd kafka-python-producer
python producer.py

Run consumer:

cd kafka-python-consumer
python consumer.py

Admin & Management

Kafka CLI Tools

Kafka CLI are located in $KAFKA_HOME/bin directory.

  1. kafka-acls - manage acls

  2. kafka-topics - create, delete, describe, or change a topic

  3. kafka-configs - create, delete, describe, or change cluster settings

  4. kafka-consumer-groups - manage consumer groups

  5. kafka-console-consumer - read data from Kafka topics and outputs it to standard output

  6. kafka-console-producer - produce data to Kafka topics

  7. kafka-consumer-perf-test - consume high volumes of data through your Kafka cluster

  8. kafka-producer-perf-test - produce high volumes of data through your Kafka cluster

  9. kafka-avro-console-producer - produce Avro data to Kafka topics with a schema (only with confluent installation)

  10. kafka-avro-console-consumer - read Avro data from Kafka topics with a schema and outputs it to standard output (only with confluent installation)

Kafka Topics: segments and retention

Bootstrap:

scripts/bootstrap.sh

Create a topic cars with retention for old segments set to 5 minutes and size of segments set to 100 KB.

Be aware that log.retention.check.interval.ms is set by default to 5 minutes and this is the frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion.

kafka-topics --bootstrap-server localhost:9092 --create --topic cars --replication-factor 1 --partitions 1 --config segment.bytes=100000 --config segment.ms=604800000 --config retention.ms=300000 --config retention.bytes=-1

Launch a producer perf test:

kafka-producer-perf-test --topic cars --num-records 99999999999999 --throughput -1 --record-size 1 --producer-props bootstrap.servers=localhost:9092

Check the log dir for cars topic and wait for deletion of old segments (5 minutes + log cleaner trigger delta)

docker exec -it broker watch ls -ltr /var/lib/kafka/data/cars-0/

Teardown:

scripts/tear-down.sh

Kafka Admin Client

It uses org.apache.kafka.clients.admin.AdminClient to execute Kafka Admin API.

Operations added:

  • list of cluster nodes

  • list topics

cd admin-client
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.admin.AdminClientWrapper" -Dexec.args="<location_of_admin_property_file>"

Schema Registry

Confluent Avro Specific Record

Implementation of a kafka producer and a kafka consumer using Avro Specific Record for serializing and deserializing.

Confluent schema registry is needed to run the example.

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic cars --replication-factor <replication_factor> --partitions <number_of_partitions>

Register first version of schema:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @src/main/resources/car_v1.avsc \
http://localhost:8081/subjects/cars-value/versions

Run the producer:

cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerProducer"

Run the consumer:

cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerConsumer"

Confluent Avro Compatibility Checks

Backward

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic cars --replication-factor <replication_factor> --partitions <number_of_partitions>

Register first version of schema:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v1.avsc \
http://localhost:8081/subjects/cars-value/versions

Put compatibility on BACKWARD:

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/cars-value

Get compatibility for subject cars-value:

curl -X GET http://localhost:8081/config/cars-value

Run the producer:

cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerProducer"

Run the consumer (keep it running):

cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerConsumer"

View the latest schema for this subject:

curl -X GET http://localhost:8081/subjects/cars-value/versions/latest | jq .

Register new version of schema, with the addition of a field with default value:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v2.avsc \
http://localhost:8081/subjects/cars-value/versions

Produce data with new schema id=2 and containing new field:

sh produce-avro-records.sh

Verify that consumer will not break and continue to process messages.

Register new version of schema, with the addition of a field with a required value:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v3.avsc \
http://localhost:8081/subjects/cars-value/versions

you will get an error:

{"error_code":42201,"message":"Invalid schema

Kafka Connect

Unixcommand Source Connector

Implementation of a sample Source Connector; it executes unix commands (e.g. fortune, ls -ltr, netstat) and sends its output to a kafka topic.

Important
commands are executed on kafka connect worker node.

This connector relies on Confluent Schema Registry to convert the values using Avro: CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter.

Connector config is in kafka-unixcommand-connector/config/source.quickstart.json file.

Parameters for source connector:

  • command – unix command to execute (e.g. ls -ltr)

  • topic – output topic

  • poll.ms – poll interval in milliseconds between every execution

Create the connector package:

cd kafka-unixcommand-connector
mvn clean package

Create a connect custom Docker image with the connector installed:

This will create an image based on confluentinc/cp-kafka-connect-base:XXX using a custom Dockerfile. It will use the Confluent utility confluent-hub install to install the plugin in connect.

kafka-unixcommand-connector/build-image.sh

Run the Docker container:

scripts/bootstrap-unixcommand-connector.sh

Deploy the connector:

curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @kafka-unixcommand-connector/config/source.quickstart.json

Teardown:

scripts/tear-down-unixcommand-.connector.sh

Custom SMT: composite key from json records.

Implementation of a custom Single Message Transformation (SMT); it creates a key from a list of json fields taken from message record value. Fields are configurable using SMT property fields.

Example:

Original record:

key: null
value: {"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}

Result after SMT:

"transforms.createKey.fields": "FIELD1,FIELD2,FIELD3"

key: 0120400001
value: {"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}

The example applies the SMT to a mongodb sink connector.

Run the example:

scripts/bootstrap-smt-connector.sh

A mongodb sink connector will be created with this config:

{
  "name": "mongo-sink",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "test",
    "connection.uri": "mongodb://admin:password@mongo:27017",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "database": "Tutorial2",
    "collection": "pets",
    "transforms": "createKey",
    "transforms.createKey.type": "org.hifly.kafka.smt.KeyFromFields",
    "transforms.createKey.fields": "FIELD1,FIELD2,FIELD3"
  }
}

Original json messages will be sent to test topic. Sink connector will apply the SMT and store the records in mongodb pets collection from Tutorial2 database.

Teardown:

scripts/tear-down-smt-connector.sh

SMT: log records with AOP

Usage of a standard SMT in a mongo sink connector.

apply method for SMT classes in package org.apache.kafka.connect.transforms is intercepted by a Java AOP Aspect implemented using AspectJ framework.

The @Aspect, implemented in class org.hifly.kafka.smt.aspectj.SMTAspect, logs the input arg (SinkRecord object) to the standard output.

 @Pointcut("execution(* org.apache.kafka.connect.transforms.*.apply(..)) && !execution(* org.apache.kafka.connect.runtime.PredicatedTransformation.apply(..))")
    public void standardMethod() {}

    @Before("standardMethod()")
    public void log(JoinPoint jp) throws Throwable {

        System.out.println(jp);

        Object[] array = jp.getArgs();
        if(array != null) {
            System.out.println("Size:" + array.length);
            for(Object tmp: array)
                System.out.println(tmp);
        }
    }

Connect log will show sink records entries:

SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='test', kafkaPartition=2, key=null, keySchema=Schema{STRING}, value={"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}, valueSchema=Schema{STRING}, timestamp=1683701851358, headers=ConnectHeaders(headers=)}

Run the example:

scripts/bootstrap-smt-aspectj.sh

Kafka Connect will start with aspectjweaver java agent:

-Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -javaagent:/usr/share/java/aspectjweaver-1.9.19.jar

Aspects are deployed as standard jars and copied to Kafka Connect classpath /etc/kafka-connect/jars/kafka-smt-aspectj-0.0.1-SNAPSHOT.jar

A mongodb sink connector will be created with this config:

{
  "name": "mongo-sink",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "test",
    "connection.uri": "mongodb://admin:password@mongo:27017",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "database": "Tutorial2",
    "collection": "pets",
    "transforms": "Filter",
    "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
    "transforms.Filter.predicate": "IsFoo",
    "predicates": "IsFoo",
    "predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.IsFoo.pattern": "test"

  }
}

Original json messages will be sent to test topic. Sink connector will apply the SMT and store the records in mongodb pets collection from Tutorial2 database.

Teardown:

scripts/tear-down-smt-aspectj.sh

Sink Connector Error Handling with a DLQ

MongoDB sink connector example configured to send bad messages to a DLQ topic named dlq.mongo

Run the example:

scripts/bootstrap-connect-dlq.sh

Create the topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic test --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server localhost:9092 --create --topic dlq.mongo --replication-factor 1 --partitions 1

Deploy the connector:

curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @kafka-connect-sink-dlq/config/connector_mongo_sink.json

A mongodb sink connector will be created with this config:

{
  "name" : "mongo-sample-sink",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "test",
    "connection.uri": "mongodb://admin:password@mongo:27017",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "database": "Tutorial2",
    "collection": "pets",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name":"dlq.mongo",
    "errors.deadletterqueue.topic.replication.factor": 1
  }
}

Send json messages to test topic (second message is a bad json message):

kafka-console-producer --broker-list localhost:9092 --topic test --property "parse.key=true" --property "key.separator=:"
> 1:{"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}
> 2:{"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117",

Sink connector will send only the first record in mongodb pets collection from Tutorial2 database.

Second message will be stored in dlq.mongo topic.

kafka-console-consumer --topic dlq.mongo --bootstrap-server localhost:9092 --from-beginning

Verify that the connector is still in RUNNING status:

curl -v http://localhost:8083/connectors?expand=status

Teardown:

scripts/tear-down-connect-dlq.sh

CDC with Debezium PostgreSQL Source Connector

Usage of Debezium Source connector for PostgreSQL to send RDMS table updates into a kafka topic.

The debezium/debezium-connector-postgresql:1.7.1 connector has been installed into connect docker image using confluent hub (see docker-compose.yml file). More details on the connector are available at: https://docs.confluent.io/debezium-connect-postgres-source/current/overview.html.

Run kafka on port 9092:

scripts/bootstrap-cdc.sh

The connector uses pgoutput plugin for replication. This plug-in is always present in PostgreSQL server. The Debezium connector interprets the raw replication event stream directly into change events.

Verify the existence of account table and data in PostgreSQL:

docker exec -it postgres psql -h localhost -p 5432 -U postgres -c 'select * from accounts;'

Deploy the connector:

curl -v -X POST -H 'Content-Type: application/json' -d @cdc-debezium-postgres/config/debezium-source-pgsql.json http://localhost:8083/connectors

Run a kafka consumer on postgres.public.accounts topic and see the records:

kafka-console-consumer --topic postgres.public.accounts --bootstrap-server localhost:9092 --from-beginning

Insert a new record into account table:

docker exec -it postgres psql -h localhost -p 5432 -U postgres -c "insert into accounts (user_id, username, password, email, created_on, last_login) values (3, 'foo3', 'bar3', 'foo3@bar.com', current_timestamp, current_timestamp);"

Teardown:

scripts/tear-down-cdc.sh

Tasks distributions

This example will show how tasks are automatically balanced between Running worker nodes.

A kafka connect cluster will be created with 2 workers, connect and connect2 and with a datagen source connector with 4 tasks continuously inserting data.

After some seconds connect2 will be stopped and all tasks will be redistributed to connect worker node.

Run sample:

scripts/bootstrap-connect-tasks.sh

You will first see tasks distributed between the 2 Running workers:

{"datagen-sample":{"status":{"name":"datagen-sample","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"},{"id":1,"state":"RUNNING","worker_id":"connect2:8083"},{"id":2,"state":"RUNNING","worker_id":"connect:8083"},{"id":3,"state":"RUNNING","worker_id":"connect2:8083"}],"type":"source"}}}

After stopping connect2, you will see tasks only distributed to connect worker:

{"datagen-sample":{"status":{"name":"datagen-sample","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"},{"id":1,"state":"RUNNING","worker_id":"connect:8083"},{"id":2,"state":"RUNNING","worker_id":"connect:8083"},{"id":3,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}}}

Teardown:

scripts/tear-down-connect-tasks.sh

Kafka streams

Implementation of a series of kafka streams topologies.

Execute tests:

cd kafka-streams
mvn clean test

Events counter Stream

Count number of events grouped by key.

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic counter-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic counter-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.StreamCounter"

Send messages to input topics:

kafka-console-producer --broker-list localhost:9092 --topic counter-input-topic --property "parse.key=true" --property "key.separator=:"
"John":"transaction_1"
"Mark":"transaction_1"
"John":"transaction_2"

Read from output topic:

kafka-console-consumer --topic counter-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"

Sum values Stream

Sum values grouping by key.

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic sum-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic sum-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.StreamSum"

Send messages to input topics:

kafka-console-producer --broker-list localhost:9092 --topic sum-input-topic --property "parse.key=true" --property "key.separator=:"
"John":1
"Mark":2
"John":5

Read from output topic:

kafka-console-consumer --topic sum-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"

Cars sensor Stream

The stream filters out speed data from car data sensor records. Speed limit is set to 150km/h and only events exceeding the limits are filtered out.
A ktable stores the car info data.
A left join between the kstream and the ktable produces a new aggregated object published to an output topic.

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic carinfo-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic carsensor-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic carsensor-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.CarSensorStream"

Send messages to input topics:

kafka-console-producer --broker-list localhost:9092 --topic carinfo-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}
kafka-console-producer --broker-list localhost:9092 --topic carsensor-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","speed":350}

Read from output topic:

kafka-console-consumer --topic carsensor-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "

Cars brand Stream

The stream splits the original data into 2 different topics, one for Ferrari cars and one for all other car brands.

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic cars-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic ferrari-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic cars-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.CarBrandStream"

Send messages to input topic:

kafka-console-producer --broker-list localhost:9092 --topic cars-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}
2:{"id":"2","brand":"Bugatti","model":"Chiron"}

Read from output topics:

kafka-console-consumer --topic ferrari-input-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "
kafka-console-consumer --topic cars-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "

JSONArray Fields removal Processor

Remove a specific json field from the record and forward it to the next topology node. This example uses Kafka streams Processor API.

Execute tests:

cd kafka-streams-processor
mvn clean test

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic processor-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic processor-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.processor.JSONArrayRemoveProcessorApplication"

Send messages to input topics:

kafka-console-producer --broker-list localhost:9092 --topic processor-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}

Read from output topic:

kafka-console-consumer --topic processor-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "

Expired Messages Processor

Remove old entries based on time (expiration time 30 seconds) using a punctuator. This example uses Kafka streams Processor API.

Execute tests:

cd kafka-streams-processor
mvn clean test

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic expired-messages-input-topic--replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic expired-messages-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.processor.ExpiredMessagesApplication"

Send messages to input topics:

kafka-console-producer --broker-list localhost:9092 --topic expired-messages-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","remote-device":"R01","time":"2021-11-02T02:50:12.208Z"}

Read from output topic:

kafka-console-consumer --topic expired-messages-input-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "

ksqlDB

Saga Pattern Example

Implementation of a sample App (kafka producer and consumer) sending and receiving orders; ksqlDB acts as an orchestrator to coordinate a sample Saga.

Compile:

cd ksqldb-saga-example
mvn schema-registry:download
mvn generate-sources
mvn clean compile

Launch on local environment:

Launch Docker Compose:

scripts/bootstrap.sh

Connect to ksqlDB and set auto.offset.reset:

ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit

Create DDL on ksqlDB:

cd ksqldb-saga-example/ksql
ksql-statements.sh

Create fat jar of Sample application (1 Saga):

cd ksqldb-saga-example
mvn clean compile assembly:single

Execute fat jar of Sample application (1 Saga):

cd ksqldb-saga-example
java -jar target/ksqldb-sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Saga Verification:

Insert entries on ksqlDB:

ksql http://ksqldb-server:8088
insert into accounts values('AAA', 'Jimmy Best');
insert into orders values('AAA', 150, 'Item0', 'A123', 'Jimmy Best', 'Transfer funds', '2020-04-22 03:19:51');
insert into orders values('AAA', -110, 'Item1', 'A123', 'amazon.it', 'Purchase', '2020-04-22 03:19:55');
insert into orders values('AAA', -100, 'Item2', 'A123', 'ebike.com', 'Purchase', '2020-04-22 03:19:58');

select * from orders_tx where account_id='AAA' and order_id='A123';
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": 0, "ACCOUNT": "AAA", "ITEMS": ["Item0"], "ORDER": "A123"}
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": 0, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1"], "ORDER": "A123"}
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": -1, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1", "Item2"], "ORDER": "A123"}
 --> compensate:{"TX_ID": "TX_AAA_A123", "TX_ACTION": -1, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1", "Item2", "ORDER": "A123"}

Teardown:

scripts/tear-down.sh

Tumbling Window example: heart rate monitoring

Implementation of a tumbling window (1 minute) to monitor heart rate. Values over a threshold of 120 beats per minute are reported.

Launch on local environment:

Launch Docker Compose:

scripts/bootstrap.sh

Connect to ksqlDB and set auto.offset.reset:

ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit

Create DDL on ksqlDB:

cd ksqldb-window-tumbling-heartbeat/ksql
ksql-statements.sh

Insert entries on ksqlDB:

cd ksqldb-window-tumbling-heartbeat/ksql
ksql-inserts.sh

Verify results:

ksql http://ksqldb-server:8088

SELECT person_id,
       beat_over_threshold_count,
       TIMESTAMPTOSTRING(window_start, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_start,
       TIMESTAMPTOSTRING(window_end, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_end
FROM heartbeat_60sec
EMIT CHANGES;

+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|PERSON_ID                                                |BEAT_OVER_THRESHOLD_COUNT                                |WINDOW_START                                             |WINDOW_END                                               |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|MGG1                                                     |3                                                        |2023-02-18 15:10:00                                      |2023-02-18 15:11:00                                      |
|MGG1                                                     |10                                                       |2023-02-18 15:15:00                                      |2023-02-18 15:16:00                                      |

Teardown:

scripts/tear-down.sh

Session Window example: Vehicle Positions

Implementation of a session window (5 minutes inactive). Vehicle positions (latitude and logitude) are collected and a new window opens when the vehicle does not send its position for 5 minutes. This is considered as a new "trip".

Launch on local environment:

Launch Docker Compose:

scripts/bootstrap.sh

Connect to ksqlDB and set auto.offset.reset:

ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit

Create DDL on ksqlDB:

cd ksqldb-window-session-tripsegments/ksql
ksql-statements.sh

Insert entries on ksqlDB:

cd ksqldb-window-session-tripsegments/ksql
ksql-inserts.sh

Verify results:

ksql http://ksqldb-server:8088

SELECT vehicle_id,
       positions_sent,
       start_latitude,
       start_longitude,
       end_latitude,
       end_longitude,
       TIMESTAMPTOSTRING(window_start, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_start,
       TIMESTAMPTOSTRING(window_end, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_end
FROM trips
EMIT CHANGES;


+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|VEHICLE_ID                 |POSITIONS_SENT             |START_LATITUDE             |START_LONGITUDE            |END_LATITUDE               |END_LONGITUDE              |WINDOW_START               |WINDOW_END                 |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|VH1                        |5                          |42.21                      |17.12                      |42.28                      |17.16                      |2023-02-18 15:10:00        |2023-02-18 15:13:00        |
|VH1                        |2                          |42.31                      |17.17                      |42.33                      |17.18                      |2023-02-18 15:20:00        |2023-02-18 15:22:00        |

Teardown:

scripts/tear-down.sh

Joins: Devices and temperature measurement

This example shows how to join a STREAM with air temperatures captured by devices and a TABLE containing the information of devices.

Air Temperatures are ingested into a kafka topic temperature.data with a RabbitMQ source connector.

Device Info are ingested into a kafka topic device with a JDBC Source Connector.

Launch Docker Compose:

scripts/bootstrap-ksqldb-join.sh

Create input topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic device --replication-factor 1 --partitions 1

kafka-topics --bootstrap-server localhost:9092 --create --topic temperature.data --replication-factor 1 --partitions 1

Deploy the JDBC Source connector:

curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_jdbc_source.json

Send data to a RabbitMQ queue temperature.queue with a python producer (5 different devices):

pip3 install pika --upgrade
ksqldb-join/config/rabbit_producer.py temperature.queue 5

-->
count:	5
queue:	temperature.queue
Send	{'id': 0, 'body': 35}
Send	{'id': 1, 'body': 18}
Send	{'id': 2, 'body': 2}
Send	{'id': 3, 'body': 5}
Send	{'id': 4, 'body': 32}
Exiting

Deploy the RabbitMQ Source connector:

curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_rabbitmq_source.json

Execute the ksqlDB statements; Stream DEVICE_TEMPERATURE is a INNER JOIN between DEVICE and TEMPERATURE.DATA

cd ksqldb-join/ksql
./ksql-statements.sh

Inner Join

Verify the enrichment with a query:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_TEMPERATURE EMIT CHANGES"

-->
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|DEVICE_ID                                                                    |FULLNAME                                                                     |TEMPERATURE                                                                         |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|1                                                                            |foo11111                                                                     |18                                                                           |
|2                                                                            |foo22222                                                                     |2                                                                            |

Left Join

Verify the enrichment with a query:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_TEMPERATURE_LJ EMIT CHANGES"

-->
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|DEVICE_ID                                                                  |FULLNAME                                                                   |TEMPERATURE                                                                |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|0                                                                          |null                                                                       |15                                                                         |
|1                                                                          |foo11111                                                                   |13                                                                         |
|2                                                                          |foo22222                                                                   |16                                                                         |
|3                                                                          |null                                                                       |34                                                                         |
|4                                                                          |null                                                                       |8                                                                          |

Joins: Devices and devices maintenance - Right Join

This example shows how to join a Table and a Table

Device Info are ingested into a kafka topic device with a JDBC Source Connector.

Maintenances are ingested into a kafka topic maintenance with a JDBC Source Connector.

Launch Docker Compose:

scripts/bootstrap-ksqldb-join.sh

Create input topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic device --replication-factor 1 --partitions 1

kafka-topics --bootstrap-server localhost:9092 --create --topic maintenance --replication-factor 1 --partitions 1

Deploy the JDBC Source connector:

curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_jdbc_source.json

curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_device_maintenance_jdbc_source.json

Execute the ksqlDB statements: TABLE MAINTENANCE RIGHT JOIN TABLE DEVICE

cd ksqldb-join/ksql
./ksql-statements-rj.sh
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_MAINTENANCE EMIT CHANGES"

-->
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|DEVICE_ID                                                                  |FULLNAME                                                                   |MAINTENANCE                                                                |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|1                                                                          |foo11111                                                                   |2023-03-01 15:00:00 16:00:00                                               |
|2                                                                          |foo22222                                                                   |null                                                                       |
|10                                                                         |foo1010101010                                                              |null                                                                       |
|15                                                                         |foo1515151515                                                              |null                                                                       |

Transactions

Transactional producer

see section [tx_producer]

Read Committed consumer

Kafka Orders App: Example with end-to-end exactly-once semantic between consumer and producer

Example of a cart application implementing end-to-end exactly-once semantic between consumer and producer.
The ItemsProducer class sends 2 items in a single transaction.
The ItemsConsumer class receives the items and creates an order containing the items.
The consumer offset is committed only if the order can be created and sent.

Execute tests:

cd kafka-orders-tx
mvn clean test

Execute the ItemsProducer class:

cd kafka-orders-tx
mvn clean compile && mvn exec:java -Dexec.mainClass="ItemsProducer"

Execute the ItemsConsumer class:

cd kafka-orders-tx
mvn clean compile && mvn exec:java -Dexec.mainClass="ItemsConsumer"

Framework

Kafka Spring Boot

Sample of a kafka producer and consumer implemented with Spring Boot 2.x.

Kafka Consumer implements a DLQ for records not processable (after 3 attempts).

Run on your local machine:

#start a producer on port 8010
cd kafka-springboot-producer
mvn spring-boot:run

#start a consumer on port 8090
cd kafka-springboot-consumer
mvn spring-boot:run

#Send orders (on topic demoTopic)
curl --data '{"id":5, "name": "PS5"}' -H "Content-Type:application/json" http://localhost:8010/api/order

#Send ERROR orders and test DLQ (on topic demoTopic)
curl --data '{"id":5, "name": "ERROR-PS5"}' -H "Content-Type:application/json" http://localhost:8010/api/order

Kafka Quarkus

Sample of a kafka producer and consumer implemented with Quarkus. Every 1s a new message is sent to demo topic.

Run on your local machine:

cd kafka-quarkus
./mvnw clean compile quarkus:dev (debug port 5005)

Run on Openshift machine:

cd kafka-quarkus
./mvnw clean package -Dquarkus.container-image.build=true -Dquarkus.kubernetes.deploy=true

Kafka microprofile2

Sample of a kafka producer and consumer running on an open liberty MicroProfile v2 runtime.

Run on docker:

#Start a zookeeper container
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper

#Start a kafka container
docker run -d --name my-cluster-kafka-bootstrap -p 9092:9092 --link zookeeper:zookeeper debezium/kafka

#Start a kafka producer container
cd kafka-microprofile2-producer
docker build -t kafka-producer:latest .
docker run -d --name kafka-producer -p 9080:9080 -e KAFKABROKERLIST=my-cluster-kafka-bootstrap:9092 --link my-cluster-kafka-bootstrap:my-cluster-kafka-bootstrap kafka-producer:latest

#Start a kafka consumer container
cd kafka-microprofile2-consumer
docker build -t kafka-consumer:latest .
docker run -d --name kafka-consumer -p 9090:9080 -e KAFKABROKERLIST=my-cluster-kafka-bootstrap:9092 --link my-cluster-kafka-bootstrap:my-cluster-kafka-bootstrap kafka-consumer:latest

#Receive orders
curl -v -X POST http://localhost:9090/kafka-microprofile2-consumer-0.0.1-SNAPSHOT/order

#Send orders (500)
curl -v -X POST http://localhost:9080/kafka-microprofile2-producer-0.0.1-SNAPSHOT/order

Security

Kafka Custom Authorizer

This example shows how to create a custom authorizer for Kafka

Important
this example is only for demo purposes and it’s not intended to be deployed in production.

Custom Authorizer org.hifly.kafka.authorizer.DummyAuthirizer extends the basic AclAuthorizer and allows authenticated users to execute operations on kafka topics without setting any ACLs on them.

Compile and package:

cd authorizers
mvn clean package
cp -rf ./target/authorizers-0.0.1-SNAPSHOT.jar ./jars

Run kafka with custom authorizer on port 9092:

scripts/boostrap-auth.sh

Run a kafka producer test using the producer.properties on listener port 9092:

producer.properties:

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="kafkabroker1" \
  password="kafkabroker1-secret";

Producer command:

kafka-console-producer --bootstrap-server localhost:9092 --topic test --producer.config ./src/main/resources/producer.properties

Run a kafka consumer test using the consumer.properties on listener port 9092:

consumer.properties:

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="kafkabroker1" \
  password="kafkabroker1-secret";
group.id=test

Consumer command:

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config ./src/main/resources/consumer.properties

Teardown:

scripts/tear-down-auth.sh

Kafka OAUTH Authentication KIP-768

This example shows how to configure kafka to use SASL/OAUTHBEARER authentication with Support for OIDC.

To run the sample you need to run Keycloak server and configure openid-connect on it.

Run Keycloak server with PostgreSQL (on port 8080) and Run Kafka with OAUTH listener on port 9093:

scripts/bootstrap-oauth.sh

Keycloak setup:

 - Login to http://localhost:8080 (admin/Pa55w0rd)
 - Create a realm called kafka
 - From the Clients tab, create a client with Cliend ID "kafka_user".
 - Change Access Type to Confidential
 - Turn Standard Flow Enabled to OFF
 - Turn Service Accounts Enabled to ON
 - In the Advanced Settings below on the settings tab, set Access Token Lifespan to 10 minutes
 - Switch to the Credentials tab
 - Set Client Authenticator to "Client Id and Secret"
 - Copy the client-secret
 - Save

Run a kafka producer test using the client-oauth-properties (add your client_secret into the file) on listener port 9093:

client-oauth-properties:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.login.connect.timeout.ms=15000
sasl.oauthbearer.token.endpoint.url=http://localhost:8080/auth/realms/kafka/protocol/openid-connect/token
sasl.oauthbearer.expected.audience=account
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="kafka_user" clientSecret="<client_secret>";

Producer command:

kafka-producer-perf-test --topic my_topic --num-records 50 --throughput 10 --record-size 1 --producer-props bootstrap.servers=localhost:9093  --producer.config kafka-oauth-kip-768/client-oauth.properties

Teardown:

scripts/tear-down-oauth.sh

Mirroring

Mirror Maker 2 tests

Example of a Mirror Maker v2 configuration Active/Active

Observability

Distributed Tracing with OpenTelemetry for kafka applications

This example shows how to configure OpenTelemetry java auto-instrumentation for a kafka streams application enabling distributed tracing.

In this example it is used opentelemetry-java-instrumentation to inject OpenTelemetry auto instrumentation as a JVM agent requiring no modifications at source code to add the traces.

Apache Kafka producers, consumers and streams are part of the supported libraries as documented at:

Run OpenTelemetry collector (otlp protocol on port 4317) and Jaeger (on port 16686):

scripts/bootstrap-tracing.sh

Create topics:

kafka-topics --bootstrap-server localhost:9092 --create --topic sum-input-topic --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server localhost:9092 --create --topic sum-output-topic --replication-factor 1 --partitions 1

Run the kafka stream application with the OpenTelemetry agent:

cd kafka-streams
mvn clean package
cd ..

export OTEL_SERVICE_NAME=stream-sum-service
export OTEL_TRACES_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317

java -javaagent:kafka-distributed-tracing/app/opentelemetry-javaagent.jar -Dotel.instrumentation.kafka.enabled=true -Dotel.javaagent.debug=true -jar kafka-streams/target/kafka-streams-0.0.1-SNAPSHOT.jar

Send messages to input topics:

kafka-console-producer --broker-list localhost:9092 --topic sum-input-topic --property "parse.key=true" --property "key.separator=:"
"John":1
"Mark":2
"John":5

Read from output topic:

kafka-console-consumer --topic sum-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"

Open the JaegerUI on http://localhost:16686 and you will have a list of traces from the streaming application.

Traces

Teardown:

scripts/tear-down-tracing.sh

Performance

Producer

Usage of kafka utility kafka-producer-perf-test to test producer performances for java applications.

Kafka cluster is formed with 3 brokers (9092, 9093, 9094).

1 client machine, named kafka_perf is used to run kafka-producer-perf-test against the kafka cluster.

Run the cluster and kafka_perf machine:

scripts/bootstrap-performance.sh

Scenario 1: effects with compression

Create a topic:

docker exec kafka-perf sh kafka-topics.sh --bootstrap-server broker:9092,broker2:9093,broker3:9094 --create --topic topic-perf --replication-factor 3 --partitions 6 --config min.insync.replicas=2

Run a scenario with:

  • 1000000 records

  • record size 2k

  • no compression

docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 --print-metrics

Run a scenario with:

  • 1000000 records

  • record size 2k

  • lz4 compression

docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 compression.type=lz4 --print-metrics

Run a scenario with:

  • 1000000 records

  • record size 2k

  • gzip compression

docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 compression.type=gzip --print-metrics

Run a scenario with:

  • 1000000 records

  • record size 2k

  • snappy compression

docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 compression.type=snappy --print-metrics

Teardown:

scripts/tear-down-performance.sh