Configuration
Install JR:
brew install jr
You can edit the following jr configuration as needed.
{
"emitters": [
{
"name": "shoe",
"locale": "us",
"num": 1,
"frequency": "5s",
"duration": "10m",
"preload": 10,
"valueTemplate": "shoe",
"output": "kafka",
"keyTemplate": "null",
"outputTemplate": "{{.V}}\n",
"topic": "shoes"
},
{
"name": "shoe_customer",
"locale": "us",
"num": 1,
"frequency": "1s",
"duration": "1s",
"preload": 5,
"valueTemplate": "shoe_customer",
"output": "kafka",
"keyTemplate": "null",
"outputTemplate": "{{.V}}\n",
"topic": "shoe_customers"
},
{
"name": "shoe_order",
"locale": "us",
"num": 1,
"frequency": "500ms",
"duration": "1s",
"preload": 0,
"valueTemplate": "shoe_order",
"output": "kafka",
"keyTemplate": "null",
"outputTemplate": "{{.V}}\n",
"topic": "shoe_orders"
},
{
"name": "shoe_clickstream",
"locale": "us",
"num": 1,
"frequency": "100ms",
"duration": "1s",
"preload": 0,
"valueTemplate": "shoe_clickstream",
"output": "kafka",
"keyTemplate": "null",
"outputTemplate": "{{.V}}\n",
"topic": "shoe_clickstream"
}
],
"global": {
"seed": -1,
"kafkaConfig": "./kafka/config.properties",
"schemaRegistry": false,
"registryConfig": "./kafka/registry.properties",
"serializer": "json-schema",
"autoCreate": true,
"redisTtl": "1m",
"redisConfig": "./redis/config.json",
"mongoConfig": "./mongoDB/config.json",
"elasticConfig": "./elastic/config.json",
"s3Config": "./s3/config.json",
"url": ""
}
}
Example
docker run --rm -d --name kafka-multitool rampi88/kafka-multitool:v1
docker exec -it kafka-multitool bash
docker stop kafka-multitool
# build and push multi-tool
docker build -t rampi88/kafka-multitool:v1 .
Example
kafka-topics --bootstrap-server localhost:9092 --delete --topic test
kafka-topics --bootstrap-server localhost:9091 --create --topic test --replication-factor 3 --partitions 3 --config min.insync.replicas=2
kafka-topics --bootstrap-server localhost:9092 --describe --topic test
kcat -b localhost:9092 -t test -P -K : -l data.txt
kcat -C -b localhost:9092 -t test \
-f 'Topic %t - Partition %p: Key is %k, and message payload is: %s \n'
Example
kafka-topics --bootstrap-server localhost:9092 --delete --topic test
kafka-topics --bootstrap-server localhost:9091 --create --topic test --replication-factor 3 --partitions 3 --config min.insync.replicas=2
kafka-topics --bootstrap-server localhost:9092 --describe --topic test
kcat -b localhost:9092 -X partitioner=random -t test -P -K : -l data.txt
kcat -C -b localhost:9092 -t test \
-f 'Topic %t - Partition %p: Key is %k, and message payload is: %s \n'
Example
kafka-topics --bootstrap-server localhost:9092 --delete --topic shoes
kafka-topics --bootstrap-server localhost:9092 --create --topic shoes --replication-factor 3 --partitions 6 --config min.insync.replicas=2
kafka-topics --bootstrap-server localhost:9092 --describe --topic shoes
# GENERATE RANDOM DATA
jr emitter run shoe
# SHELL 1
kcat -b localhost:9092 -G mygroup shoes
# SHELL 2
kcat -b localhost:9092 -G mygroup shoes
### WITH COOPEERATIVE REBALANCING ###
# SHELL 1
kcat -b localhost:9092 -X partition.assignment.strategy=cooperative-sticky -G mygroup shoes
# SHELL 2
kcat -b localhost:9092 -X partition.assignment.strategy=cooperative-sticky -G mygroup shoes
Example
kafka-topics --bootstrap-server localhost:9092 --delete --topic test
kafka-topics --bootstrap-server localhost:9092 --create --topic test --replication-factor 3 --partitions 6 --config min.insync.replicas=2
kafka-topics --bootstrap-server localhost:9092 --describe --topic test
# PRODUCING WITH ACKs ALL
echo "test" | kafka-console-producer --bootstrap-server localhost:9092 --topic test
# CONSUMING
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning --timeout-ms 5000
# STOP BROKER kafka1
docker stop kafka1
echo "test" | kafka-console-producer --bootstrap-server localhost:9092 --topic test
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning --timeout-ms 5000
# STOP BROKER kafka3
docker stop kafka3
echo "test" | kafka-console-producer --bootstrap-server localhost:9092 --topic test
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning --timeout-ms 5000
kafka-topics --bootstrap-server localhost:9092 --describe --topic test
# PRODUCING WITH ACK=1
echo "test" | kafka-console-producer --bootstrap-server localhost:9092 --topic test --request-required-acks 1
Example
kafka-topics --bootstrap-server localhost:9092 --delete --topic test
kafka-topics --bootstrap-server localhost:9091 --create --topic test --replication-factor 3 \
--partitions 1 --config min.insync.replicas=2 --config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.0 --config max.compaction.lag.ms=100 \
--config segment.ms=100 --config delete.retention.ms=100
kafka-topics --bootstrap-server localhost:9092 --describe --topic test
kcat -b localhost:9092 -t test -P -K : -l data.txt
kcat -C -b localhost:9092 -t test \
-f 'Offset: %o, Key: %k, Message payload: %s \n'
# ACTIVE SEGMENT ARE NOT ELIGIBLE FOR LOG COMPACTION -> FORCE COMPACTION WITH ONE NEW MESSAGE
echo "key9:message21" | kcat -b localhost:9092 -P -t test -K:
sleep 5
kcat -C -b localhost:9092 -t test \
-f 'Offset: %o, Key: %k, Message payload: %s \n'
Example
kafka-topics --bootstrap-server localhost:9092 --delete --topic test
kafka-topics --bootstrap-server localhost:9091 --create --topic test --replication-factor 3 --partitions 1 --config min.insync.replicas=2
# SHELL 1
kcat -C -b localhost:9092 -X isolation.level=read_uncommitted -t test \
-f 'Key is %k, and message payload is: %s \n'
# SHELL 2
kcat -C -b localhost:9092 -t test \
-f 'Key is %k, and message payload is: %s \n'
# SHELL 3
# Python Transactional Producer SHELL 3
pip install -r python_examples/requirements.txt
python3 python_examples/transactional_producer.py
Example
docker-compose -f docker-compose.scram.yaml up -d
# CREATE USER
docker exec -it kafka2 sh -c "kafka-configs --bootstrap-server kafka2:19092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin"
docker exec -it kafka2 sh -c "kafka-configs --bootstrap-server kafka2:19092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice"
# CREATE TOPIC WITH SASL CREDENTIALS
kafka-topics --bootstrap-server localhost:9092 --command-config kafka/admin.properties --delete --topic test
kafka-topics --bootstrap-server localhost:9092 --command-config kafka/admin.properties --create --topic test
# SET ACLs
kafka-acls --bootstrap-server localhost:9092 \
--command-config kafka/admin.properties \
--add \
--allow-principal User:alice \
--operation all \
--topic test
kafka-acls --bootstrap-server localhost:9092 \
--command-config kafka/admin.properties \
--add \
--deny-principal User:alice \
--operation delete \
--topic test
# ALLOWED OPERATION
echo "test" | kcat -b localhost:9092 -P -t test -X security.protocol=SASL_PLAINTEXT -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=alice -X sasl.password=alice-secret
echo "test" | kcat -b localhost:9092 -C -o beginning -t test -X security.protocol=SASL_PLAINTEXT -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=alice -X sasl.password=alice-secret
# DENIED OPERATION
kafka-topics --bootstrap-server localhost:9092 --command-config kafka/alice.properties --delete --topic test
docker-compose -f docker-compose.scram.yaml down -d
Example
# GENERATE RANDOM DATA
# jr run shoe_order -o kafka -t shoe_order -s --serializer json-schema -f 1s -d 10m
ksql-datagen value-format=avro quickstart=pageviews msgRate=1 bootstrap-server=localhost:9092 topic=pageviews iterations=100
curl localhost:8081/subjects/
kcat -b localhost:9092 -t pageviews -s value=avro -r http://localhost:8081 -C -o beginning
Example
curl --request PUT \
--url http://localhost:8083/connectors/transactions/config \
--header 'content-type: application/json' \
--data '{"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "max.interval": 1000, "iterations": 100, "value.converter": "io.confluent.connect.avro.AvroConverter", "quickstart": "transactions", "kafka.topic": "transactions", "value.converter.schema.registry.url": "http://schema-registry:8081"}'
curl localhost:8083/connectors | jq
curl localhost:8083/connectors/transactions/status | jq
curl -X DELETE localhost:8083/connectors/transactions | jq
Example
docker-compose -f docker-compose.scram.yaml up -d
docker exec -it kafka2 sh -c "kafka-configs --bootstrap-server kafka2:19092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin"
kafka-topics --bootstrap-server localhost:9092 --command-config kafka/admin_with_file_config_provider.properties --create --topic test
kafka-topics --bootstrap-server localhost:9092 --command-config kafka/admin_with_file_config_provider.properties --list
Example
# SHELL 1
docker-compose -f docker-compose.kraft.yml up -d
export KAFKA_OPTS="-javaagent:volumes/jmx_prometheus_javaagent-0.20.0.jar=9191:volumes/kafka_client.yml"
kafka-topics --bootstrap-server localhost:9092 --create --topic test
kafka-console-consumer --bootstrap-server localhost:9092 --topic test
# SHELL 2
curl localhost:9191
Example
cd examples/kafka-examples
Example
cd examples/kafka-examples
mvn package
# Shell 1 - Produce
java -javaagent:jmx_prometheus_javaagent-0.20.0.jar=9191:prometheus_config.yml -cp producer/target/producer-1.0-SNAPSHOT.jar com.github.prametta.producer.MyBeerProducer
# Shell 2 - Consumer
java -javaagent:jmx_prometheus_javaagent-0.20.0.jar=9192:prometheus_config.yml -cp consumer/target/consumer-1.0-SNAPSHOT.jar com.github.prametta.consumer.MyBeerConsumer
# Shell 3 - Process
java -javaagent:jmx_prometheus_javaagent-0.20.0.jar=9193:prometheus_config.yml -cp streams/target/streams-1.0-SNAPSHOT.jar com.github.prametta.streams.MyKafkaBeerStreamApp
Example
cd examples/kafka-examples
mvn package
cd ../../
docker-compose -f docker-compose.kraft.clients.yml up -d --build
# tierdown
docker-compose -f docker-compose.kraft.clients.yml down -v
Confluent for Kubernetes - Dynamic client switch from cluster 1 to cluster 2 (Reloader)
Example
cd k8s
alias k="kubectl"
kind create cluster
# Create namespaces
k create namespace confluent
k create namespace confluent-dr
k config set-context --current --namespace=confluent
# install Reloader
kubectl apply -f https://raw.githubusercontent.com/stakater/Reloader/master/deployments/kubernetes/reloader.yaml
# Install Confluent for Kubernetes Operator
helm repo add confluentinc https://packages.confluent.io/helm
helm repo update
helm upgrade --install confluent-operator \
confluentinc/confluent-for-kubernetes \
--set namespaced=false \
--namespace confluent
# install the confluent kubernetes plugin (OPTIONAL)
... [Confluent Plugin](https://docs.confluent.io/operator/current/co-deploy-cfk.html#co-install-plugin)
# Create main cluster
k apply -f cluster.1.yaml
# Create DR cluster
k config set-context --current --namespace=confluent-dr
k apply -f cluster.2.yaml
# Create conifgmap used by the producer
k create configmap client-properties --from-file=client.properties
# Create the producer - the client will produce to the main cluster
k apply -f producer.dpl.yaml
# Edit the configmap and auto reboot the producer - the client will produce against the DR cluster now
kubectl edit configmap client-properties # change the bootstrap server to kafka.confluent-dr
Example
cd k8s
alias k="kubectl"
kind create cluster
# Create namespaces
k create namespace confluent
k config set-context --current --namespace=confluent
# install Vault
helm repo add hashicorp https://helm.releases.hashicorp.com
helm repo update
helm install vault hashicorp/vault --set "server.dev.enabled=true"
# Install Confluent for Kubernetes Operator
helm repo add confluentinc https://packages.confluent.io/helm
helm repo update
helm upgrade --install confluent-operator \
confluentinc/confluent-for-kubernetes \
--set namespaced=false \
--namespace confluent
# install the confluent kubernetes plugin (OPTIONAL)
... [Confluent Plugin](https://docs.confluent.io/operator/current/co-deploy-cfk.html#co-install-plugin)
# Create main cluster
k apply -f cluster.1.yaml
# Create conifgmap used by the producer
k create configmap client-properties --from-file=client.properties
# Create the producer - the client will produce to the main cluster
k apply -f producer.dpl.yaml
Example
cd k8s
alias k="kubectl"
kind create cluster
# Create namespaces
k create namespace confluent
k config set-context --current --namespace=confluent
# Install Confluent for Kubernetes Operator
helm repo add confluentinc https://packages.confluent.io/helm
helm repo update
helm upgrade --install confluent-operator \
confluentinc/confluent-for-kubernetes \
--set namespaced=false \
--namespace confluent
# install the confluent kubernetes plugin (OPTIONAL)
... [Confluent Plugin](https://docs.confluent.io/operator/current/co-deploy-cfk.html#co-install-plugin)
# Create main cluster
k apply -f cluster.1.yaml
# Install KEDA
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
Example
docker-compose -f docker-compose.migration.yaml up -d
# Create Users in Cluster A (Source)
docker exec -it kafka1 sh -c "kafka-configs --bootstrap-server kafka1:19091 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin"
docker exec -it kafka1 sh -c "kafka-configs --bootstrap-server kafka1:19091 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice"
# Create Users in Cluster B (Destination)
docker exec -it kafka2 sh -c "kafka-configs --bootstrap-server kafka2:19092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin"
docker exec -it kafka2 sh -c "kafka-configs --bootstrap-server kafka2:19092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice"
# Open a bash session from kafka-multitool
docker exec -it kafka-multitool bash
# Create topics on Cluster A
kafka-topics --bootstrap-server kafka1:9091 --command-config /tmp/admin.properties --create --topic Test-mytopic
# Set prefixed ACLs on Cluster A
kafka-acls --bootstrap-server kafka1:9091 \
--command-config /tmp/admin.properties \
--add \
--allow-principal User:alice \
--operation all \
--topic Test- \
--resource-pattern-type prefixed
# Set literal ACLs on Cluster A
kafka-acls --bootstrap-server kafka1:9091 \
--command-config /tmp/admin.properties \
--add \
--allow-principal User:alice \
--operation all \
--topic Test-mytopic
# Run MirrorMaker2 from kafka-multitool
connect-mirror-maker /tmp/mm2.properties
# List Topics from both clusters
kafka-topics --bootstrap-server kafka1:9091 --command-config /tmp/admin.properties --list
kafka-topics --bootstrap-server kafka2:9092 --command-config /tmp/admin.properties --list
# List ACLs from both clusters
kafka-acls --bootstrap-server kafka1:9091 --command-config /tmp/admin.properties --list
kafka-acls --bootstrap-server kafka2:9092 --command-config /tmp/admin.properties --list
# Note that only the literal ACL has been migrated
docker-compose -f docker-compose.migration.yaml down -d
Example
docker-compose -f docker-compose.migration.yaml up -d
# Create Users in Cluster A (Source)
docker exec -it kafka1 sh -c "kafka-configs --bootstrap-server kafka1:19091 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin"
docker exec -it kafka1 sh -c "kafka-configs --bootstrap-server kafka1:19091 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice"
# Create Users in Cluster B (Destination)
docker exec -it kafka2 sh -c "kafka-configs --bootstrap-server kafka2:19092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin"
docker exec -it kafka2 sh -c "kafka-configs --bootstrap-server kafka2:19092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice"
# Open a bash session from kafka-multitool
docker exec -it kafka-multitool bash
# Create topics on Cluster A
kafka-topics --bootstrap-server kafka1:9091 --command-config /tmp/admin.properties --create --topic Test-mytopic
# Set prefixed ACLs on Cluster A
kafka-acls --bootstrap-server kafka1:9091 \
--command-config /tmp/admin.properties \
--add \
--allow-principal User:alice \
--operation all \
--topic Test- \
--resource-pattern-type prefixed
# Set literal ACLs on Cluster A
kafka-acls --bootstrap-server kafka1:9091 \
--command-config /tmp/admin.properties \
--add \
--allow-principal User:alice \
--operation all \
--topic Test-mytopic
# Create Cluster Link
# https://docs.confluent.io/platform/current/multi-dc-deployments/cluster-linking/configs.html#configuration-options
kafka-cluster-links --bootstrap-server kafka2:9092 --command-config /tmp/admin.properties --create --link demo-link \
--config-file /tmp/link.properties --acl-filters-json-file /tmp/acl_filters.json \
--consumer-group-filters-json-file /tmp/consumer_group_filters.json \
--topic-filters-json-file /tmp/topic_filters.json
kafka-cluster-links --bootstrap-server kafka2:9092 --command-config /tmp/admin.properties --list
kafka-cluster-links --bootstrap-server kafka2:9092 --command-config /tmp/admin.properties --link demo-link --describe
# List mirrors
kafka-mirrors --bootstrap-server kafka2:9092 --command-config /tmp/admin.properties --list
# List Topics from both clusters
kafka-topics --bootstrap-server kafka1:9091 --command-config /tmp/admin.properties --list
kafka-topics --bootstrap-server kafka2:9092 --command-config /tmp/admin.properties --list
# List ACLs from both clusters
kafka-acls --bootstrap-server kafka1:9091 --command-config /tmp/admin.properties --list
kafka-acls --bootstrap-server kafka2:9092 --command-config /tmp/admin.properties --list
# Note that only the literal ACL has been migrated
docker-compose -f docker-compose.migration.yaml down -d
Example
cd zk-reconfigure
# Start a 3-nodes zookeeper quorum
./startup.sh
# Add 2 nodes to the existing cluster
zookeeper-server-start zoo.4.properties >> zoo.4.log 2>&1 &
zookeeper-server-start zoo.5.properties >> zoo.5.log 2>&1 &
# Restart the existing nodes - Follower first, Leader afterwards
./leader_finder.sh
# Order will change
./shutdown.sh 1
zookeeper-server-start zoo.1.new.properties >> zoo.1.log 2>&1 &
sleep 5
./shutdown.sh 2
zookeeper-server-start zoo.2.new.properties >> zoo.2.log 2>&1 &
sleep 5
./shutdown.sh 3
zookeeper-server-start zoo.3.new.properties >> zoo.3.log 2>&1 &
sleep 5
./leader_finder.sh
Example
docker-compose up -d
Example
docker-compose -f docker-compose.tracing.yaml up -d
# Create a sample topic
kafka-topics --bootstrap-server localhost:9092 --create --topic test --partitions 1
# Export the javaagent configuration
export KAFKA_OPTS="-javaagent:opentelemetry-javaagent.jar \
-Dotel.service.name=test-svc \
-Dotel.traces.exporter=jaeger \
-Dotel.metrics.exporter=none \
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true"
# Produce
seq 1 10 | kafka-console-producer --bootstrap-server localhost:9092 --topic test
# Consume
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --timeout-ms 5000
Open Jaeger and check for spans.