
Fork from github.com/shafreeck/kafka-cli. The origin one doesn't continue so I fork it. Have no plan to submit back so I don't use the Fork button to fork it.

Primary LanguageGoApache License 2.0Apache-2.0



go get github.com/spacewander/kafka-cli


kafka-cli is a console util tool to access kafka cluster

  kafka-cli [command]

Available Commands:
  consume     consume topic from kafka
  monitor     display kafka cluster metrics, TODO
  produce     produce message
  topics      list all topics

      --brokers string                    broker list, delimited by comma (default "")
      --buffersize int                    internal channel buffer size (default 256)
      --clientid string                   a user-provided string sent with every request to the brokers for logging debugging, and auditing purposes (default "kafka-cli")
      --config string                     config file (default is $HOME/.kafka-cli.yaml)
      --metadata.refresh duration         metadata refresh frequency (default 10m0s)
      --metadata.retry.backoff duration   backoff between retrying (default 250ms)
      --metadata.retry.max int            total number to request metadata when the cluster has a leader election (default 3)
      --net.dialtimeout duration          timeout of dialing to brokers (default 30s)
      --net.keepalive duration            keepalive period, 0 means disabled
      --net.maxopenrequests int           how many outstanding requests a connection is allowed to have before sending on it blocks (default 5)
      --net.readtimeout duration          timeout of reading messages (default 30s)
      --net.writetimeout duration         timeout of writing messages (default 30s)
  -v, --verbose                           print log messages
      --zookeepers string                 zookeeper server list, delimited by comma, only use when operate topic (default "")

Use "kafka-cli [command] --help" for more information about a command.


Several environment variables can be used to authentica the client.

  • KAFKA_CLI_USER: username for SASL/PLAIN or SASL/SCRAM authentication
  • KAFKA_CLI_PASSWORD: password for SASL/PLAIN or SASL/SCRAM authentication
  • KAFKA_CLI_CLIENT_ID: the client ID, default to kafka-cli, this variable can be overwrited by --connectid option

List all topics

./kafka-cli --brokers topics
topic           partition[replicaid...]:offset ...
unexpected-logs 0[0]:12285581
output-pod2     0[0]:9725580
default-topic   0[0]:13971093
test            0[0]:5
filebeats       0[0]:12488893   1[0]:11301355   2[0]:11301353   3[0]:11301197   4[0]:11301197   5[0]:11301197   6[0]:11301200   7[0]:11301356

Create topics

./kafka-cli --brokers --zookeepers topics create  t1 t2 t3 --partitions 10 --replicas 3

Delete topics

./kafka-cli --brokers --zookeepers topics delete  t1 t2 t3

Consume topics

./kafka-cli --brokers consume default-topic unexpected-logs

Produce messages

./kafka-cli --brokers= produce test /tmp/x.txt [key]

Read bytes from the given file (/tmp/x.txt) and send them to topic (test). The optional key argument can be specified after the filename.