Here you have some useful commands for kafka.
Table of Contents
First, set some kafka environment vars.
KAFKA_BIN=/opt/kafka/bin
ZOOKEEPER_HOST=ip-zookeeper
BROKER_HOST=ip-broker
$KAFKA_BIN/kafka-topics.sh \
--list \
--zookeeper $ZOOKEEPER_HOST:2181
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST:2181 \
--topic <topic_name> \
--describe
$KAFKA_BIN/kafka-topics.sh \
--create \
--zookeeper $ZOOKEEPER_HOST:2181 \
--replication-factor 1 \
--partitions 1 \
--topic <topic_name>
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST:2181 \
--alter \
--topic <topic_name>\
--config retention.ms=1000
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST:2181 \
--alter \
--topic <topic_name> \
--config min.insync.replicas=2
$KAFKA_BIN/kafka-topics.sh \
--zookeeper $ZOOKEEPER_HOST:2181 \
--describe \
--under-replicated-partitions
$KAFKA_BIN/kafka-topics.sh \
--delete \
--zookeeper $ZOOKEEPER_HOST:2181 \
--topic <topic_name>
$KAFKA_BIN/kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list $BROKER_HOST:9092 \
--topic <topic_name> \
--time -2
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST:2181 \
--reassignment-json-file increase-replication-factor.json \
--execute
$KAFKA_BIN/kafka-reassign-partitions.sh \
--zookeeper $ZOOKEEPER_HOST:2181 \
--reassignment-json-file increase-replication-factor.json \
--verify
$KAFKA_BIN/kafka-consumer-groups.sh \
--list \
--bootstrap-server $BROKER_HOST:9092
$KAFKA_BIN/kafka-consumer-groups.sh \
--describe \
--group <group_id> \
--bootstrap-server $BROKER_HOST:9092
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST:9092 \
--topic <topic_name> \
--from-beginning
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST:9092 \
--topic <topic_name>
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST:9092 \
--topic <topic_name> \
--max-messages 1
$KAFKA_BIN/kafka-console-consumer.sh \
--bootstrap-server $BROKER_HOST:9092 \
--topic __consumer_offsets \
--formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' \
--max-messages 1
$KAFKA_BIN/kafka-console-consumer.sh \
--topic <topic_name> \
--bootstrap-server $BROKER_HOST:9092 \
--group <group-id>
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST:9092 \
--group <group_id> \
--describe
# There are many other resetting options
# --shift-by <positive_or_negative_integer> / --to-current / --to-latest / --to-offset <offset_integer>
# --to-datetime <datetime_string> --by-duration <duration_string>
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST:9092 \
--group <group_id> \
--topic <topic_name> \
--reset-offsets \
--to-earliest \
--execute
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST:9092 \
--all-groups \
--reset-offsets \
--topic <topic_name> \
--to-earliest
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST:9092 \
--group <groud_id> \
--reset-offsets \
--shift-by 2 \
--execute \
--topic <topic_name>
$KAFKA_BIN/kafka-consumer-groups.sh \
--bootstrap-server $BROKER_HOST:9092 \
--describe \
--group <group_id>
$KAFKA_BIN/kafka-console-producer.sh \
--broker-list $BROKER_HOST:9092 \
--topic <topic_name> < messages.txt
$KAFKA_BIN/kafka-console-producer \
--broker-list $BROKER_HOST:9092 \
--topic <topic_name>
echo "My Message" | $KAFKA_BIN/kafka-console-producer.sh \
--broker-list $BROKER_HOST:9092 \
--topic <topic_name>
$KAFKA_BIN/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST:2181 \
--add \
--allow-principal User:Gus \
--consumer \
--topic <topic_name> \
--group <group_id>
$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST:2181 \
--add \
--allow-principal User:Gus \
--producer \
--topic <topic_name>