Consumer Group Offset Update Scripts


timestamp-offset-update.sh This script will find the latest offset for a consumer group on a given topicA. Then consume that record and grab of a pre-determined timestamp field that is present in the payload. From there, the script will gather all the partitions timestamps, find the earliest one, and create a statement to set the consumer group offset for topicB.

You will want to update the RECORD_FIELD variable in the script to match the JSON field path that you want to use to select your timestamp value.

maxlag-offset-update.sh This script will find the largest consumer lag for a given consumer group on topicA and then produce an update statement to shift the offsets for the consumer group on TopicB.

The OFFSET_BACKOFF variable can be tweaked to move the offset away from the Max Lag. It is default to 100, so if Max Lag found is 500, the script will generate a shift value of 600. This allows you to be able to easily adjust the max lag if needed.

Getting Started

You will need to create a file kafka.properties that has the connectivity information for your kafka cluster. See sample-kafka.properties for a template. Just make sure to name it kafka.properties or change the CONFLUENT_CONFIG_FILE variable in the script.

In each script, you can update the CONFLUENT_PATH variable to point to you installment of the Confluent toolkit.

If you do not have the Confluent platform, this can be downloaded here: https://docs.confluent.io/platform/current/installation/installing_cp/zip-tar.html#install-cp-using-zip-and-tar-archives

You will also need jq on your machine to run timestamp-offset-update. This is used to parse out the field that you are wanting. It can be found here: https://stedolan.github.io/jq/

Script Arguments

Each script takes 3 arguments to execute. The consumer group, topicA, and topicB.

Example: ./maxlag-offset-update.sh consumer-group-abc, myOldTopic, theNewTopic