Details about the installation, info at:
https://kafka.apache.org/documentation/#quickstart
To run a kafka cluster with Docker, use the docker-compose.yml file available in the root directory.
Images are downloaded from confluentinc and are based on Confluent 7.4.0 Community licensed (kafka version 3.4.0):
-
Zookeeper: confluentinc/cp-zookeeper
-
Kafka: confluentinc/cp-kafka
-
Schema Registry: confluentinc/cp-schema-registry
-
Connect: custom image based on confluentinc/cp-kafka-connect-base
-
ksqlDB server: confluentinc/ksqldb-server:0.28.2
-
ksqlDB cli: confluentinc/ksqldb-cli:0.28.2
Components list:
-
Broker will listen to localhost:9092
-
Schema Registry will listen to localhost:8081
-
Connect will listen to localhost:8083
-
ksqlDB cli will listen to localhost:8088
To run a kafka cluster on Kubernetes, have a look at Confluent for Kubernetes operator (CFK).
Example of bootstrapping a cluster using CFK:
Some implementations of kafka producers.
To launch the examples run kafka on port 9092:
scripts/bootstrap.sh
It uses org.apache.kafka.common.serialization.StringSerializer for key and value
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.string.Runner"
It uses org.apache.kafka.common.serialization.StringSerializer for key and a org.hifly.kafka.demo.producer.serializer.json.JsonSerializer for value
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.json.Runner"
It uses org.apache.kafka.common.serialization.StringSerializer for key and value and set enable.idempoteceny and transactional.id
Create a topic with 3 partitions:
kafka-topics --bootstrap-server localhost:9092 --create --topic test-idempotent --replication-factor 1 --partitions 3
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.tx.Runner"
It uses io.confluent.kafka.serializers.KafkaAvroSerializer for value and a GenericRecord.
Confluent schema registry is needed to run the example.
More Info at: https://github.com/confluentinc/schema-registry
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.avro.Runner" -Dexec.args="CONFLUENT"
It uses io.apicurio.registry.utils.serde.AvroKafkaSerializer for value and a GenericRecord.
Apicurio schema registry is needed to run the example.
Start Apicurio:
./scripts/bootstrap-apicurio.sh
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.avro.Runner" -Dexec.args="APICURIO"
Teardown:
./scripts/teardown-apicurio.sh
It uses a custom partitioner for keys.
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.partitioner.custom.Runner"
Execute tests:
cd kafka-producer
mvn clean test
This example shows how to create a custom producer interceptor. class CreditCardProducerInterceptor will mask a sensitive info on producer record (credit card).
Compile and package:
cd interceptors
mvn clean package
Run a kafka producer on listener port 9092:
mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.producer.Runner"
Run a kafka consumer on listener port 9092:
mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.consumer.Runner"
Verify output:
record is:XXXXXX
Topic: test_custom_data - Partition: 0 - Offset: 1
Teardown:
scripts/tear-down.sh
Implementation of a kafka consumer that can be used with different deserializer classes (for key and value).
Class org.hifly.kafka.demo.consumer.deserializer.impl.ConsumerInstance can be customized with:
-
clientId (string)
-
groupId (string)
-
topics (string separated by comma)
-
key deserializer class (string)
-
value deserializer class (string)
-
partition assignment strategy (org.apache.kafka.clients.consumer.RangeAssignor|org.apache.kafka.clients.consumer.RoundRobinAssignor|org.apache.kafka.clients.consumer.StickyAssignor|org.apache.kafka.clients.consumer.CooperativeStickyAssignor)
-
isolation.level (read_uncommitted|read_committed)
-
poll timeout (ms)
-
consume duration (ms)
-
autoCommit (true|false)
-
commit sync (true|false)
-
subscribe mode (true|false)
Topics can be passed as argument 1 of the main program:
-Dexec.args="users,users_clicks"
Partition assignment strategy can be passed as argument 2 of the main program:
-Dexec.args="users,users_clicks org.apache.kafka.clients.consumer.RoundRobinAssignor"
Execute tests:
cd kafka-consumer
mvn clean test
To launch the examples run kafka on port 9092:
scripts/bootstrap.sh
It uses org.apache.kafka.common.serialization.StringDeserializer for key and value. Default topic is topic1.
cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner"
Send messages to the topic:
kafka-console-producer --broker-list localhost:9092 --topic topic1 --property "parse.key=true" --property "key.separator=:"
> Frank:1
It uses org.apache.kafka.common.serialization.StringDeserializer for key and value and set isolation.level to read_committed.
It must be only used with a transactional producer.
cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.tx.Runner"
Create 2 topics with same number of partitions:
kafka-topics --bootstrap-server localhost:9092 --create --topic users --replication-factor 1 --partitions 3
kafka-topics --bootstrap-server localhost:9092 --create --topic users_clicks --replication-factor 1 --partitions 3
Run 2 consumer instances (2 different shell/terminal) belonging to the same group and subscribed to user and user_clicks topics; consumers uses org.apache.kafka.clients.consumer.RangeAssignor to distribute partition ownership.
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner" -Dexec.args="users,users_clicks"
Send messages to the topics using the same key (Frank):
kafka-console-producer --broker-list localhost:9092 --topic users --property "parse.key=true" --property "key.separator=:"
> Frank:1
kafka-console-producer --broker-list localhost:9092 --topic users_clicks --property "parse.key=true" --property "key.separator=:"
> Frank:1
Verify that the same consumer instance will read both messages.
Group id group-XX - Consumer id: consumer-group-XX-1-421db3e2-6501-45b1-acfd-275ce8d18368 - Topic: users - Partition: 1 - Offset: 0 - Key: frank - Value: 1
Group id group-XX - Consumer id: consumer-group-XX-1-421db3e2-6501-45b1-acfd-275ce8d18368 - Topic: users_clicks - Partition: 1 - Offset: 0 - Key: frank - Value: 1
Create 2 topics with same number of partitions:
kafka-topics --bootstrap-server localhost:9092 --create --topic users --replication-factor 1 --partitions 3
kafka-topics --bootstrap-server localhost:9092 --create --topic users_clicks --replication-factor 1 --partitions 3
Run 2 consumer instances (2 different shell/terminal) belonging to the same group and subscribed to user and user_clicks topics; consumers uses org.apache.kafka.clients.consumer.RoundRobinAssignor to distribute partition ownership.
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner" -Dexec.args="users,users_clicks org.apache.kafka.clients.consumer.RoundRobinAssignor"
Send messages to the topics using the same key (Frank):
kafka-console-producer --broker-list localhost:9092 --topic users --property "parse.key=true" --property "key.separator=:"
> Frank:1
kafka-console-producer --broker-list localhost:9092 --topic users_clicks --property "parse.key=true" --property "key.separator=:"
> Frank:1
Verify that messages are read by different consumers.
This example shows how to create a custom consumer interceptor. class CreditCardConsumerInterceptor will intercept records before deserialization and print headers.
Compile and package:
cd interceptors
mvn clean package
Run a kafka producer on listener port 9092:
mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.producer.Runner"
Run a kafka consumer on listener port 9092:
mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.consumer.Runner"
Verify output:
record headers:RecordHeaders(headers = [], isReadOnly = false)
Group id consumer-interceptor-g2 - Consumer id: consumer-consumer-interceptor-g2-1-0e20b2b6-3269-4bc5-bfdb-ca787cf68aa8 - Topic: test_custom_data - Partition: 0 - Offset: 0 - Key: null - Value: XXXXXX
Consumer 23d06b51-5780-4efc-9c33-a93b3caa3b48 - partition 0 - lastOffset 1
Teardown:
scripts/tear-down.sh
Install python lib confluent-kafka:
pip install confluent-kafka
Create topic:
kafka-topics --bootstrap-server localhost:9092 --create --topic kafka-topic --replication-factor 1 --partitions 1
Run producer:
cd kafka-python-producer
python producer.py
Run consumer:
cd kafka-python-consumer
python consumer.py
Kafka CLI are located in $KAFKA_HOME/bin directory.
-
kafka-acls - manage acls
-
kafka-topics - create, delete, describe, or change a topic
-
kafka-configs - create, delete, describe, or change cluster settings
-
kafka-consumer-groups - manage consumer groups
-
kafka-console-consumer - read data from Kafka topics and outputs it to standard output
-
kafka-console-producer - produce data to Kafka topics
-
kafka-consumer-perf-test - consume high volumes of data through your Kafka cluster
-
kafka-producer-perf-test - produce high volumes of data through your Kafka cluster
-
kafka-avro-console-producer - produce Avro data to Kafka topics with a schema (only with confluent installation)
-
kafka-avro-console-consumer - read Avro data from Kafka topics with a schema and outputs it to standard output (only with confluent installation)
Bootstrap:
scripts/bootstrap.sh
Create a topic cars with retention for old segments set to 5 minutes and size of segments set to 100 KB.
Be aware that log.retention.check.interval.ms is set by default to 5 minutes and this is the frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion.
kafka-topics --bootstrap-server localhost:9092 --create --topic cars --replication-factor 1 --partitions 1 --config segment.bytes=100000 --config segment.ms=604800000 --config retention.ms=300000 --config retention.bytes=-1
Launch a producer perf test:
kafka-producer-perf-test --topic cars --num-records 99999999999999 --throughput -1 --record-size 1 --producer-props bootstrap.servers=localhost:9092
Check the log dir for cars topic and wait for deletion of old segments (5 minutes + log cleaner trigger delta)
docker exec -it broker watch ls -ltr /var/lib/kafka/data/cars-0/
Teardown:
scripts/tear-down.sh
It uses org.apache.kafka.clients.admin.AdminClient to execute Kafka Admin API.
Operations added:
-
list of cluster nodes
-
list topics
cd admin-client
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.admin.AdminClientWrapper" -Dexec.args="<location_of_admin_property_file>"
Implementation of a kafka producer and a kafka consumer using Avro Specific Record for serializing and deserializing.
Confluent schema registry is needed to run the example.
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic cars --replication-factor <replication_factor> --partitions <number_of_partitions>
Register first version of schema:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @src/main/resources/car_v1.avsc \
http://localhost:8081/subjects/cars-value/versions
Run the producer:
cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerProducer"
Run the consumer:
cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerConsumer"
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic cars --replication-factor <replication_factor> --partitions <number_of_partitions>
Register first version of schema:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v1.avsc \
http://localhost:8081/subjects/cars-value/versions
Put compatibility on BACKWARD:
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/cars-value
Get compatibility for subject cars-value:
curl -X GET http://localhost:8081/config/cars-value
Run the producer:
cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerProducer"
Run the consumer (keep it running):
cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerConsumer"
View the latest schema for this subject:
curl -X GET http://localhost:8081/subjects/cars-value/versions/latest | jq .
Register new version of schema, with the addition of a field with default value:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v2.avsc \
http://localhost:8081/subjects/cars-value/versions
Produce data with new schema id=2 and containing new field:
sh produce-avro-records.sh
Verify that consumer will not break and continue to process messages.
Register new version of schema, with the addition of a field with a required value:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v3.avsc \
http://localhost:8081/subjects/cars-value/versions
you will get an error:
{"error_code":42201,"message":"Invalid schema
Implementation of a sample Source Connector; it executes unix commands (e.g. fortune, ls -ltr, netstat) and sends its output to a kafka topic.
Important
|
commands are executed on kafka connect worker node. |
This connector relies on Confluent Schema Registry to convert the values using Avro: CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter.
Connector config is in kafka-unixcommand-connector/config/source.quickstart.json file.
Parameters for source connector:
-
command – unix command to execute (e.g. ls -ltr)
-
topic – output topic
-
poll.ms – poll interval in milliseconds between every execution
Create the connector package:
cd kafka-unixcommand-connector
mvn clean package
Create a connect custom Docker image with the connector installed:
This will create an image based on confluentinc/cp-kafka-connect-base:XXX using a custom Dockerfile. It will use the Confluent utility confluent-hub install to install the plugin in connect.
kafka-unixcommand-connector/build-image.sh
Run the Docker container:
scripts/bootstrap-unixcommand-connector.sh
Deploy the connector:
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @kafka-unixcommand-connector/config/source.quickstart.json
Teardown:
scripts/tear-down-unixcommand-.connector.sh
Implementation of a custom Single Message Transformation (SMT); it creates a key from a list of json fields taken from message record value. Fields are configurable using SMT property fields.
Example:
Original record:
key: null
value: {"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}
Result after SMT:
"transforms.createKey.fields": "FIELD1,FIELD2,FIELD3"
key: 0120400001
value: {"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}
The example applies the SMT to a mongodb sink connector.
Run the example:
scripts/bootstrap-smt-connector.sh
A mongodb sink connector will be created with this config:
{
"name": "mongo-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "test",
"connection.uri": "mongodb://admin:password@mongo:27017",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"database": "Tutorial2",
"collection": "pets",
"transforms": "createKey",
"transforms.createKey.type": "org.hifly.kafka.smt.KeyFromFields",
"transforms.createKey.fields": "FIELD1,FIELD2,FIELD3"
}
}
Original json messages will be sent to test topic. Sink connector will apply the SMT and store the records in mongodb pets collection from Tutorial2 database.
Teardown:
scripts/tear-down-smt-connector.sh
Usage of a standard SMT in a mongo sink connector.
apply method for SMT classes in package org.apache.kafka.connect.transforms is intercepted by a Java AOP Aspect implemented using AspectJ framework.
The @Aspect, implemented in class org.hifly.kafka.smt.aspectj.SMTAspect, logs the input arg (SinkRecord object) to the standard output.
@Pointcut("execution(* org.apache.kafka.connect.transforms.*.apply(..)) && !execution(* org.apache.kafka.connect.runtime.PredicatedTransformation.apply(..))")
public void standardMethod() {}
@Before("standardMethod()")
public void log(JoinPoint jp) throws Throwable {
System.out.println(jp);
Object[] array = jp.getArgs();
if(array != null) {
System.out.println("Size:" + array.length);
for(Object tmp: array)
System.out.println(tmp);
}
}
Connect log will show sink records entries:
SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='test', kafkaPartition=2, key=null, keySchema=Schema{STRING}, value={"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}, valueSchema=Schema{STRING}, timestamp=1683701851358, headers=ConnectHeaders(headers=)}
Run the example:
scripts/bootstrap-smt-aspectj.sh
Kafka Connect will start with aspectjweaver java agent:
-Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -javaagent:/usr/share/java/aspectjweaver-1.9.19.jar
Aspects are deployed as standard jars and copied to Kafka Connect classpath /etc/kafka-connect/jars/kafka-smt-aspectj-0.0.1-SNAPSHOT.jar
A mongodb sink connector will be created with this config:
{
"name": "mongo-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "test",
"connection.uri": "mongodb://admin:password@mongo:27017",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"database": "Tutorial2",
"collection": "pets",
"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "IsFoo",
"predicates": "IsFoo",
"predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsFoo.pattern": "test"
}
}
Original json messages will be sent to test topic. Sink connector will apply the SMT and store the records in mongodb pets collection from Tutorial2 database.
Teardown:
scripts/tear-down-smt-aspectj.sh
MongoDB sink connector example configured to send bad messages to a DLQ topic named dlq.mongo
Run the example:
scripts/bootstrap-connect-dlq.sh
Create the topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic test --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server localhost:9092 --create --topic dlq.mongo --replication-factor 1 --partitions 1
Deploy the connector:
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @kafka-connect-sink-dlq/config/connector_mongo_sink.json
A mongodb sink connector will be created with this config:
{
"name" : "mongo-sample-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "test",
"connection.uri": "mongodb://admin:password@mongo:27017",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"database": "Tutorial2",
"collection": "pets",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"dlq.mongo",
"errors.deadletterqueue.topic.replication.factor": 1
}
}
Send json messages to test topic (second message is a bad json message):
kafka-console-producer --broker-list localhost:9092 --topic test --property "parse.key=true" --property "key.separator=:"
> 1:{"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}
> 2:{"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117",
Sink connector will send only the first record in mongodb pets collection from Tutorial2 database.
Second message will be stored in dlq.mongo topic.
kafka-console-consumer --topic dlq.mongo --bootstrap-server localhost:9092 --from-beginning
Verify that the connector is still in RUNNING status:
curl -v http://localhost:8083/connectors?expand=status
Teardown:
scripts/tear-down-connect-dlq.sh
Usage of Debezium Source connector for PostgreSQL to send RDMS table updates into a kafka topic.
The debezium/debezium-connector-postgresql:1.7.1 connector has been installed into connect docker image using confluent hub (see docker-compose.yml file). More details on the connector are available at: https://docs.confluent.io/debezium-connect-postgres-source/current/overview.html.
Run kafka on port 9092:
scripts/bootstrap-cdc.sh
The connector uses pgoutput plugin for replication. This plug-in is always present in PostgreSQL server. The Debezium connector interprets the raw replication event stream directly into change events.
Verify the existence of account table and data in PostgreSQL:
docker exec -it postgres psql -h localhost -p 5432 -U postgres -c 'select * from accounts;'
Deploy the connector:
curl -v -X POST -H 'Content-Type: application/json' -d @cdc-debezium-postgres/config/debezium-source-pgsql.json http://localhost:8083/connectors
Run a kafka consumer on postgres.public.accounts topic and see the records:
kafka-console-consumer --topic postgres.public.accounts --bootstrap-server localhost:9092 --from-beginning
Insert a new record into account table:
docker exec -it postgres psql -h localhost -p 5432 -U postgres -c "insert into accounts (user_id, username, password, email, created_on, last_login) values (3, 'foo3', 'bar3', 'foo3@bar.com', current_timestamp, current_timestamp);"
Teardown:
scripts/tear-down-cdc.sh
This example will show how tasks are automatically balanced between Running worker nodes.
A kafka connect cluster will be created with 2 workers, connect and connect2 and with a datagen source connector with 4 tasks continuously inserting data.
After some seconds connect2 will be stopped and all tasks will be redistributed to connect worker node.
Run sample:
scripts/bootstrap-connect-tasks.sh
You will first see tasks distributed between the 2 Running workers:
{"datagen-sample":{"status":{"name":"datagen-sample","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"},{"id":1,"state":"RUNNING","worker_id":"connect2:8083"},{"id":2,"state":"RUNNING","worker_id":"connect:8083"},{"id":3,"state":"RUNNING","worker_id":"connect2:8083"}],"type":"source"}}}
After stopping connect2, you will see tasks only distributed to connect worker:
{"datagen-sample":{"status":{"name":"datagen-sample","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"},{"id":1,"state":"RUNNING","worker_id":"connect:8083"},{"id":2,"state":"RUNNING","worker_id":"connect:8083"},{"id":3,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}}}
Teardown:
scripts/tear-down-connect-tasks.sh
Implementation of a series of kafka streams topologies.
Execute tests:
cd kafka-streams
mvn clean test
Count number of events grouped by key.
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic counter-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic counter-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
Run the topology:
cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.StreamCounter"
Send messages to input topics:
kafka-console-producer --broker-list localhost:9092 --topic counter-input-topic --property "parse.key=true" --property "key.separator=:"
"John":"transaction_1"
"Mark":"transaction_1"
"John":"transaction_2"
Read from output topic:
kafka-console-consumer --topic counter-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"
Sum values grouping by key.
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic sum-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic sum-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
Run the topology:
cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.StreamSum"
Send messages to input topics:
kafka-console-producer --broker-list localhost:9092 --topic sum-input-topic --property "parse.key=true" --property "key.separator=:"
"John":1
"Mark":2
"John":5
Read from output topic:
kafka-console-consumer --topic sum-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"
The stream filters out speed data from car data sensor records. Speed limit is set to 150km/h and only events exceeding the limits are filtered out.
A ktable stores the car info data.
A left join between the kstream and the ktable produces a new aggregated object published to an output topic.
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic carinfo-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic carsensor-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic carsensor-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
Run the topology:
cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.CarSensorStream"
Send messages to input topics:
kafka-console-producer --broker-list localhost:9092 --topic carinfo-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}
kafka-console-producer --broker-list localhost:9092 --topic carsensor-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","speed":350}
Read from output topic:
kafka-console-consumer --topic carsensor-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "
The stream splits the original data into 2 different topics, one for Ferrari cars and one for all other car brands.
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic cars-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic ferrari-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic cars-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
Run the topology:
cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.CarBrandStream"
Send messages to input topic:
kafka-console-producer --broker-list localhost:9092 --topic cars-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}
2:{"id":"2","brand":"Bugatti","model":"Chiron"}
Read from output topics:
kafka-console-consumer --topic ferrari-input-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "
kafka-console-consumer --topic cars-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "
Remove a specific json field from the record and forward it to the next topology node. This example uses Kafka streams Processor API.
Execute tests:
cd kafka-streams-processor
mvn clean test
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic processor-input-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic processor-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
Run the topology:
cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.processor.JSONArrayRemoveProcessorApplication"
Send messages to input topics:
kafka-console-producer --broker-list localhost:9092 --topic processor-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}
Read from output topic:
kafka-console-consumer --topic processor-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "
Remove old entries based on time (expiration time 30 seconds) using a punctuator. This example uses Kafka streams Processor API.
Execute tests:
cd kafka-streams-processor
mvn clean test
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic expired-messages-input-topic--replication-factor <replication_factor> --partitions <number_of_partitions>
kafka-topics --bootstrap-server localhost:9092 --create --topic expired-messages-output-topic --replication-factor <replication_factor> --partitions <number_of_partitions>
Run the topology:
cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.processor.ExpiredMessagesApplication"
Send messages to input topics:
kafka-console-producer --broker-list localhost:9092 --topic expired-messages-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","remote-device":"R01","time":"2021-11-02T02:50:12.208Z"}
Read from output topic:
kafka-console-consumer --topic expired-messages-input-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : "
Implementation of a sample App (kafka producer and consumer) sending and receiving orders; ksqlDB acts as an orchestrator to coordinate a sample Saga.
Compile:
cd ksqldb-saga-example
mvn schema-registry:download
mvn generate-sources
mvn clean compile
Launch on local environment:
Launch Docker Compose:
scripts/bootstrap.sh
Connect to ksqlDB and set auto.offset.reset:
ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit
Create DDL on ksqlDB:
cd ksqldb-saga-example/ksql
ksql-statements.sh
Create fat jar of Sample application (1 Saga):
cd ksqldb-saga-example
mvn clean compile assembly:single
Execute fat jar of Sample application (1 Saga):
cd ksqldb-saga-example
java -jar target/ksqldb-sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar
Saga Verification:
Insert entries on ksqlDB:
ksql http://ksqldb-server:8088
insert into accounts values('AAA', 'Jimmy Best');
insert into orders values('AAA', 150, 'Item0', 'A123', 'Jimmy Best', 'Transfer funds', '2020-04-22 03:19:51');
insert into orders values('AAA', -110, 'Item1', 'A123', 'amazon.it', 'Purchase', '2020-04-22 03:19:55');
insert into orders values('AAA', -100, 'Item2', 'A123', 'ebike.com', 'Purchase', '2020-04-22 03:19:58');
select * from orders_tx where account_id='AAA' and order_id='A123';
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": 0, "ACCOUNT": "AAA", "ITEMS": ["Item0"], "ORDER": "A123"}
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": 0, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1"], "ORDER": "A123"}
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": -1, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1", "Item2"], "ORDER": "A123"}
--> compensate:{"TX_ID": "TX_AAA_A123", "TX_ACTION": -1, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1", "Item2", "ORDER": "A123"}
Teardown:
scripts/tear-down.sh
Implementation of a tumbling window (1 minute) to monitor heart rate. Values over a threshold of 120 beats per minute are reported.
Launch on local environment:
Launch Docker Compose:
scripts/bootstrap.sh
Connect to ksqlDB and set auto.offset.reset:
ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit
Create DDL on ksqlDB:
cd ksqldb-window-tumbling-heartbeat/ksql
ksql-statements.sh
Insert entries on ksqlDB:
cd ksqldb-window-tumbling-heartbeat/ksql
ksql-inserts.sh
Verify results:
ksql http://ksqldb-server:8088
SELECT person_id,
beat_over_threshold_count,
TIMESTAMPTOSTRING(window_start, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_start,
TIMESTAMPTOSTRING(window_end, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_end
FROM heartbeat_60sec
EMIT CHANGES;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|PERSON_ID |BEAT_OVER_THRESHOLD_COUNT |WINDOW_START |WINDOW_END |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|MGG1 |3 |2023-02-18 15:10:00 |2023-02-18 15:11:00 |
|MGG1 |10 |2023-02-18 15:15:00 |2023-02-18 15:16:00 |
Teardown:
scripts/tear-down.sh
Implementation of a session window (5 minutes inactive). Vehicle positions (latitude and logitude) are collected and a new window opens when the vehicle does not send its position for 5 minutes. This is considered as a new "trip".
Launch on local environment:
Launch Docker Compose:
scripts/bootstrap.sh
Connect to ksqlDB and set auto.offset.reset:
ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit
Create DDL on ksqlDB:
cd ksqldb-window-session-tripsegments/ksql
ksql-statements.sh
Insert entries on ksqlDB:
cd ksqldb-window-session-tripsegments/ksql
ksql-inserts.sh
Verify results:
ksql http://ksqldb-server:8088
SELECT vehicle_id,
positions_sent,
start_latitude,
start_longitude,
end_latitude,
end_longitude,
TIMESTAMPTOSTRING(window_start, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_start,
TIMESTAMPTOSTRING(window_end, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_end
FROM trips
EMIT CHANGES;
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|VEHICLE_ID |POSITIONS_SENT |START_LATITUDE |START_LONGITUDE |END_LATITUDE |END_LONGITUDE |WINDOW_START |WINDOW_END |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|VH1 |5 |42.21 |17.12 |42.28 |17.16 |2023-02-18 15:10:00 |2023-02-18 15:13:00 |
|VH1 |2 |42.31 |17.17 |42.33 |17.18 |2023-02-18 15:20:00 |2023-02-18 15:22:00 |
Teardown:
scripts/tear-down.sh
This example shows how to join a STREAM with air temperatures captured by devices and a TABLE containing the information of devices.
Air Temperatures are ingested into a kafka topic temperature.data with a RabbitMQ source connector.
Device Info are ingested into a kafka topic device with a JDBC Source Connector.
Launch Docker Compose:
scripts/bootstrap-ksqldb-join.sh
Create input topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic device --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server localhost:9092 --create --topic temperature.data --replication-factor 1 --partitions 1
Deploy the JDBC Source connector:
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_jdbc_source.json
Send data to a RabbitMQ queue temperature.queue with a python producer (5 different devices):
pip3 install pika --upgrade
ksqldb-join/config/rabbit_producer.py temperature.queue 5
-->
count: 5
queue: temperature.queue
Send {'id': 0, 'body': 35}
Send {'id': 1, 'body': 18}
Send {'id': 2, 'body': 2}
Send {'id': 3, 'body': 5}
Send {'id': 4, 'body': 32}
Exiting
Deploy the RabbitMQ Source connector:
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_rabbitmq_source.json
Execute the ksqlDB statements; Stream DEVICE_TEMPERATURE is a INNER JOIN between DEVICE and TEMPERATURE.DATA
cd ksqldb-join/ksql
./ksql-statements.sh
Verify the enrichment with a query:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_TEMPERATURE EMIT CHANGES"
-->
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|DEVICE_ID |FULLNAME |TEMPERATURE |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|1 |foo11111 |18 |
|2 |foo22222 |2 |
Verify the enrichment with a query:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_TEMPERATURE_LJ EMIT CHANGES"
-->
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|DEVICE_ID |FULLNAME |TEMPERATURE |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|0 |null |15 |
|1 |foo11111 |13 |
|2 |foo22222 |16 |
|3 |null |34 |
|4 |null |8 |
This example shows how to join a Table and a Table
Device Info are ingested into a kafka topic device with a JDBC Source Connector.
Maintenances are ingested into a kafka topic maintenance with a JDBC Source Connector.
Launch Docker Compose:
scripts/bootstrap-ksqldb-join.sh
Create input topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic device --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server localhost:9092 --create --topic maintenance --replication-factor 1 --partitions 1
Deploy the JDBC Source connector:
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_jdbc_source.json
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_device_maintenance_jdbc_source.json
Execute the ksqlDB statements: TABLE MAINTENANCE RIGHT JOIN TABLE DEVICE
cd ksqldb-join/ksql
./ksql-statements-rj.sh
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_MAINTENANCE EMIT CHANGES"
-->
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|DEVICE_ID |FULLNAME |MAINTENANCE |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|1 |foo11111 |2023-03-01 15:00:00 16:00:00 |
|2 |foo22222 |null |
|10 |foo1010101010 |null |
|15 |foo1515151515 |null |
see section [tx_producer]
see section [readcommitted_consumer]
Example of a cart application implementing end-to-end exactly-once semantic between consumer and producer.
The ItemsProducer class sends 2 items in a single transaction.
The ItemsConsumer class receives the items and creates an order containing the items.
The consumer offset is committed only if the order can be created and sent.
Execute tests:
cd kafka-orders-tx
mvn clean test
Execute the ItemsProducer class:
cd kafka-orders-tx
mvn clean compile && mvn exec:java -Dexec.mainClass="ItemsProducer"
Execute the ItemsConsumer class:
cd kafka-orders-tx
mvn clean compile && mvn exec:java -Dexec.mainClass="ItemsConsumer"
Sample of a kafka producer and consumer implemented with Spring Boot 2.x.
Kafka Consumer implements a DLQ for records not processable (after 3 attempts).
Run on your local machine:
#start a producer on port 8010
cd kafka-springboot-producer
mvn spring-boot:run
#start a consumer on port 8090
cd kafka-springboot-consumer
mvn spring-boot:run
#Send orders (on topic demoTopic)
curl --data '{"id":5, "name": "PS5"}' -H "Content-Type:application/json" http://localhost:8010/api/order
#Send ERROR orders and test DLQ (on topic demoTopic)
curl --data '{"id":5, "name": "ERROR-PS5"}' -H "Content-Type:application/json" http://localhost:8010/api/order
Sample of a kafka producer and consumer implemented with Quarkus. Every 1s a new message is sent to demo topic.
Run on your local machine:
cd kafka-quarkus
./mvnw clean compile quarkus:dev (debug port 5005)
Run on Openshift machine:
cd kafka-quarkus
./mvnw clean package -Dquarkus.container-image.build=true -Dquarkus.kubernetes.deploy=true
Sample of a kafka producer and consumer running on an open liberty MicroProfile v2 runtime.
Run on docker:
#Start a zookeeper container
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
#Start a kafka container
docker run -d --name my-cluster-kafka-bootstrap -p 9092:9092 --link zookeeper:zookeeper debezium/kafka
#Start a kafka producer container
cd kafka-microprofile2-producer
docker build -t kafka-producer:latest .
docker run -d --name kafka-producer -p 9080:9080 -e KAFKABROKERLIST=my-cluster-kafka-bootstrap:9092 --link my-cluster-kafka-bootstrap:my-cluster-kafka-bootstrap kafka-producer:latest
#Start a kafka consumer container
cd kafka-microprofile2-consumer
docker build -t kafka-consumer:latest .
docker run -d --name kafka-consumer -p 9090:9080 -e KAFKABROKERLIST=my-cluster-kafka-bootstrap:9092 --link my-cluster-kafka-bootstrap:my-cluster-kafka-bootstrap kafka-consumer:latest
#Receive orders
curl -v -X POST http://localhost:9090/kafka-microprofile2-consumer-0.0.1-SNAPSHOT/order
#Send orders (500)
curl -v -X POST http://localhost:9080/kafka-microprofile2-producer-0.0.1-SNAPSHOT/order
This example shows how to create a custom authorizer for Kafka
Important
|
this example is only for demo purposes and it’s not intended to be deployed in production. |
Custom Authorizer org.hifly.kafka.authorizer.DummyAuthirizer extends the basic AclAuthorizer and allows authenticated users to execute operations on kafka topics without setting any ACLs on them.
Compile and package:
cd authorizers
mvn clean package
cp -rf ./target/authorizers-0.0.1-SNAPSHOT.jar ./jars
Run kafka with custom authorizer on port 9092:
scripts/boostrap-auth.sh
Run a kafka producer test using the producer.properties on listener port 9092:
producer.properties:
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="kafkabroker1" \
password="kafkabroker1-secret";
Producer command:
kafka-console-producer --bootstrap-server localhost:9092 --topic test --producer.config ./src/main/resources/producer.properties
Run a kafka consumer test using the consumer.properties on listener port 9092:
consumer.properties:
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="kafkabroker1" \
password="kafkabroker1-secret";
group.id=test
Consumer command:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config ./src/main/resources/consumer.properties
Teardown:
scripts/tear-down-auth.sh
This example shows how to configure kafka to use SASL/OAUTHBEARER authentication with Support for OIDC.
To run the sample you need to run Keycloak server and configure openid-connect on it.
Run Keycloak server with PostgreSQL (on port 8080) and Run Kafka with OAUTH listener on port 9093:
scripts/bootstrap-oauth.sh
Keycloak setup:
- Login to http://localhost:8080 (admin/Pa55w0rd)
- Create a realm called kafka
- From the Clients tab, create a client with Cliend ID "kafka_user".
- Change Access Type to Confidential
- Turn Standard Flow Enabled to OFF
- Turn Service Accounts Enabled to ON
- In the Advanced Settings below on the settings tab, set Access Token Lifespan to 10 minutes
- Switch to the Credentials tab
- Set Client Authenticator to "Client Id and Secret"
- Copy the client-secret
- Save
Run a kafka producer test using the client-oauth-properties (add your client_secret into the file) on listener port 9093:
client-oauth-properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.login.connect.timeout.ms=15000
sasl.oauthbearer.token.endpoint.url=http://localhost:8080/auth/realms/kafka/protocol/openid-connect/token
sasl.oauthbearer.expected.audience=account
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="kafka_user" clientSecret="<client_secret>";
Producer command:
kafka-producer-perf-test --topic my_topic --num-records 50 --throughput 10 --record-size 1 --producer-props bootstrap.servers=localhost:9093 --producer.config kafka-oauth-kip-768/client-oauth.properties
Teardown:
scripts/tear-down-oauth.sh
This example shows how to configure OpenTelemetry java auto-instrumentation for a kafka streams application enabling distributed tracing.
In this example it is used opentelemetry-java-instrumentation to inject OpenTelemetry auto instrumentation as a JVM agent requiring no modifications at source code to add the traces.
Apache Kafka producers, consumers and streams are part of the supported libraries as documented at:
Run OpenTelemetry collector (otlp protocol on port 4317) and Jaeger (on port 16686):
scripts/bootstrap-tracing.sh
Create topics:
kafka-topics --bootstrap-server localhost:9092 --create --topic sum-input-topic --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server localhost:9092 --create --topic sum-output-topic --replication-factor 1 --partitions 1
Run the kafka stream application with the OpenTelemetry agent:
cd kafka-streams
mvn clean package
cd ..
export OTEL_SERVICE_NAME=stream-sum-service
export OTEL_TRACES_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
java -javaagent:kafka-distributed-tracing/app/opentelemetry-javaagent.jar -Dotel.instrumentation.kafka.enabled=true -Dotel.javaagent.debug=true -jar kafka-streams/target/kafka-streams-0.0.1-SNAPSHOT.jar
Send messages to input topics:
kafka-console-producer --broker-list localhost:9092 --topic sum-input-topic --property "parse.key=true" --property "key.separator=:"
"John":1
"Mark":2
"John":5
Read from output topic:
kafka-console-consumer --topic sum-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"
Open the JaegerUI on http://localhost:16686 and you will have a list of traces from the streaming application.
Teardown:
scripts/tear-down-tracing.sh
Usage of kafka utility kafka-producer-perf-test to test producer performances for java applications.
Kafka cluster is formed with 3 brokers (9092, 9093, 9094).
1 client machine, named kafka_perf is used to run kafka-producer-perf-test against the kafka cluster.
Run the cluster and kafka_perf machine:
scripts/bootstrap-performance.sh
Create a topic:
docker exec kafka-perf sh kafka-topics.sh --bootstrap-server broker:9092,broker2:9093,broker3:9094 --create --topic topic-perf --replication-factor 3 --partitions 6 --config min.insync.replicas=2
Run a scenario with:
-
1000000 records
-
record size 2k
-
no compression
docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 --print-metrics
Run a scenario with:
-
1000000 records
-
record size 2k
-
lz4 compression
docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 compression.type=lz4 --print-metrics
Run a scenario with:
-
1000000 records
-
record size 2k
-
gzip compression
docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 compression.type=gzip --print-metrics
Run a scenario with:
-
1000000 records
-
record size 2k
-
snappy compression
docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 compression.type=snappy --print-metrics
Teardown:
scripts/tear-down-performance.sh