Various kafka-related scripts to easily inspect and manage the cluster, topics, consumer groups, etc.
Scripts in the repository assume you have installed next utilities:
- curl
- httpie
- jq
- podman (highly recommended, alrough
docker
also supported, please look at the end of the readme for configuration instructions). - (optionally)pygmentize - for better default formatting output in JSON (see [.shared.sh] functions)
Fore example on Fedora you may just install it like: sudo dnf install curl httpie jq podman
Just create .config.sh file by example with said .config-sandbox.sh. It is recommended use symlink to switch beetween configurations.
And then just call any script like:
./_kafkacat.list-topics.sh
It will work with values from config, or provide details what needs to be provided additionally.
For easy use different environments you may just follow configuration files naming conventions, described below and call it like:
ENV=PROD ./_kafkacat.list-topics.sh
ENV=SBOX ./_kafkacat.list-topics.sh
As recommended before, it is recommended place configs into separate directory conf/<env>
(e.g. conf/production).
In the root directory of the repository assumed .config.sh
default file, and optionally symlinks by environments like: .config.sh.PROD
, .config.sh.SBOX
...
That should look like:
[ "$0" = "${BASH_SOURCE[0]}" ] && echo 'Config file must be sourced!' && exit 1
ENV=PROD
: ${KAFKA_BOOTSTRAP_SERVERS:=kafka.epm-eco.projects.example.com:9095}
: ${SCHEMA_REGISTRY:=http://schema-registry.epm-eco.projects.example.com:8081}
: ${KEY_SERIALIZATION:=-s key=avro}
# -J for JSON. Or you may provide format as you wish
: ${KAFKACAT_CONSUME_TOPIC_FORMAT=-J}
#: ${KAFKACAT_CONSUME_TOPIC_FORMAT='-f --\nKey (%K bytes): %k\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\nHeaders: %h\n'}
# Without value itself:
#: ${KAFKACAT_CONSUME_TOPIC_FORMAT='-f --\nKey (%K bytes): %k\t\nValue %S bytes\n\Partition: %p\tOffset: %o\nHeaders: %h\n'}
_conf_dir=$(dirname "$(realpath "${BASH_SOURCE[0]}")")
CONTAINER_CACHE_EXTRA_OPTIONS_kafkacat=('-v.:/host' "-v${_conf_dir}:/conf:Z,ro" "-v${_conf_dir}/krb5.conf:/etc/krb5.conf:Z,ro")
# Note! For Kerberos auth you need also configure
CONTAINER_CACHE_EXTRA_OPTIONS_confluent=('--network=host' '-v.:/host' "-v${_conf_dir}:/conf:Z,ro" "-v${_conf_dir}/krb5.conf:/etc/krb5.conf:Z,ro" '--env=KAFKA_HEAP_OPTS=-Xmx4096M' '--env=KAFKA_OPTS=-Djava.security.auth.login.config=/conf/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf')
: ${KERBEROS_USER:=Pavel_Alexeev@EXAMPLE.COM}
: ${KERBEROS_KEYTAB_FILE:="conf/${ENV}/${KERBEROS_USER}.keytab"}
# In command below we mount /conf for holds certificates and keystores. File paswd also must contain password for kerberos account,
# provided in sasl.kerberos.kinit.cmd line. Please be careful and NEVER commit sensitive information into git!!!
KAFKACAT_SECURE_OPTIONS=(
'-Xssl.ca.location=/conf/epm-eco-prod.ca.crt'
'-Xsecurity.protocol=SASL_SSL'
'-Xsasl.mechanisms=GSSAPI'
'-Xsasl.kerberos.principal=kafkaclient'
"-Xsasl.kerberos.kinit.cmd=/usr/bin/kinit --password-file=/conf/paswd ${KERBEROS_USER}"
# OR keytab based variant (see script keytab.regenerate for generation):
# "-Xsasl.kerberos.kinit.cmd=/usr/bin/kinit -kt /conf/${KERBEROS_USER}.keytab ${KERBEROS_USER}"
)
# In command below we mount /conf for holds certificates and keystores.
# provided in sasl.kerberos.kinit.cmd line. Please be careful and NEVER commit sensitive information into git!!!
# '-Xsasl.kerberos.kinit.cmd=/usr/bin/kinit --password-file=/conf/paswd Pavel_Alexeev@PETERSBURG.EXAMPLE.COM'
# You may obtain keytab file like (by https://stackoverflow.com/questions/8144596/kerberos-kinit-enter-password-without-prompt/8282084#8282084, https://stackoverflow.com/questions/37454308/script-kerberos-ktutil-to-make-keytabs):
# See script keytab.regenerate for that!
KAFKACAT_SECURE_OPTIONS=(
'-Xssl.ca.location=/conf/epm-eco-int.ca.crt'
'-Xsecurity.protocol=SASL_SSL'
'-Xsasl.mechanisms=GSSAPI'
'-Xsasl.kerberos.principal=kafkaclient'
)
: ${KAFKA_CONNECT_HOST:=localhost:8083}
: ${KSQLDB_SERVER:=http://localhost:8088}
CONSUMER_GROUP_ID=epm-ddo.consumer.$(hostname).$(date --iso-8601=s)
Please note, for use with Kerberos you probably will need several configuration files also:
- Server CA certificate 'server.ca.crt'
conf/<env>/paswd
with password to the AD account, configured for usage in.config.sh
.Warning Such password never should be committed into the GIT! And that is ignored in the repository (please pay attention also to the
error
naming)- Instead of providing plain text password in the file, as described before, you may also use
keytab
based auth. See configuration alternatives before and script keytab.regenerate krb5.conf
file. As example:# By https://kb.example.com/display/EPMECOSYS/Pub-Sub+Clients [libdefaults] default_realm = EXAMPLE.COM dns_canonicalize_hostname = false rdns = false # dns_lookup_realm = true # dns_lookup_kdc = true dns_lookup_realm = false dns_lookup_kdc = false [realms] # EXAMPLE.COM = { # kdc = example.com:88 # admin_server = example.com # default_domain = example.com # } EXAMPLE.COM = { kdc = EVBYMINSA0016.example.com kdc = EVBYMINSA0084.example.com kdc = EVBYMINSA0018.example.com admin_server = EVBYMINSA0016.example.com } PETERSBURG.EXAMPLE.COM = { kdc = evbyminsa0007.petersburg.example.com. kdc = evhubudsa0309.budapest.example.com. admin_server = evbyminsa0007.petersburg.example.com. } [domain_realm] .example.com = EXAMPLE.COM example.com = EXAMPLE.COM [login] krb4_convert = true krb4_get_tickets = false
- Client configuration like
kafka-client.properties
- Truststore, possibly with configures password (e.g.
server-prod.truststore
) jaas.conf
- java JAAS Login Configuration File.
If you really want to use docker instead of podman (I've not reccommend), please run first:
cat <<CONF > .config.global.sh
alias podman=docker
shopt -s expand_aliases
CONF
$ ./_kafka.consume-topic.sh
./_kafka.consume-topic.sh: line 7: TOPIC: Not enough vars set: TOPIC required. Example: TOPIC=topic1 ./_kafka.consume-topic.sh
./_kafkacat.list-topics.sh
Will provide JSON outpupt about topics with name, partitions, lader and so on. If you are wishes just list do:
./_kafkacat.list-topics.plain-list.sh
TOPIC=gidplatform_dev.mytracker.activities ./_kafkacat.consume-topic.sh
If topic uses AVRO and schema registry:
TOPIC=gidplatform_dev.mytracker.activities ./_kafkacat.consume-topic.avro.sh
Avro, last 5 messages:
N=5 TOPIC=gidplatform_dev.mytracker.activities ./_kafkacat.consume-topic.avro.sh
TIP If you are prefer you always can pass additional arguments to the underlying utilities too. E.g.:
TOPIC=gidplatform-test.tracking ./_kafkacat.consume-topic.sh -o-2
Strongly speaking this is not strictly retlated, but having jq we may turn it into "a-la SQL". E.g.:
TOPIC=gidplatform_dev.mytracker.activities ./_kafkacat.consume-topic.avro.lastN.sh | jq 'select("2023-12-31" == .payload.dtEvent.string)'
./_kafka.consumer-groups.list.sh
Said by recommend-stage
, from the output of above command:
KAFKA_CONSUMER_GROUP=recommend-stage ./_kafka.consumer-group.describe.sh