Yelp/kafka-utils

Unable to unsubsribe topic from consumer group

Closed this issue · 2 comments

Offsets stored in kafka. Kafka 0.10.1. Kafka-utils 1.5.0. Kafka-python 1.3.3. Exception:

`kafka-consumer-manager -t devel --cluster-name test-1 unsubscribe_topics --topic test-topic --storage kafka mirror-maker-1
Cluster name: test-1, consumer group: mirror-maker-1
Offsets of all topics and partitions listed below shall be modified:
Topic: test-topic, Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Is this what you really intend? (y/n) y
Traceback (most recent call last):
File "/usr/bin/kafka-consumer-manager", line 6, in
run()
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/main.py", line 90, in run
args.command(args, conf)
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/unsubscribe_topics.py", line 127, in run
topics_dict,
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/unsubscribe_topics.py", line 150, in unsubscribe_topics
self.delete_topic(group, topic)
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/unsubscribe_topics.py", line 221, in delete_topic
offset_storage='kafka',
File "/usr/lib/python2.7/site-packages/kafka_utils/util/offsets.py", line 545, in set_consumer_offsets
callback=_check_commit_response_error
File "/usr/lib/python2.7/site-packages/retrying.py", line 49, in wrapped_f
return Retrying(*dargs, **dkw).call(f, *args, **kw)
File "/usr/lib/python2.7/site-packages/retrying.py", line 206, in call
return attempt.get(self._wrap_exception)
File "/usr/lib/python2.7/site-packages/retrying.py", line 247, in get
six.reraise(self.value[0], self.value[1], self.value[2])
File "/usr/lib/python2.7/site-packages/retrying.py", line 200, in call
attempt = Attempt(fn(*args, **kwargs), attempt_number, False)
File "/usr/lib/python2.7/site-packages/kafka_utils/util/client.py", line 69, in send_offset_commit_request_kafka
if not fail_on_error or not self._raise_on_response_error(resp)]
File "/usr/lib/python2.7/site-packages/kafka/client.py", line 413, in _raise_on_response_error
kafka.errors.check_error(resp)
File "/usr/lib/python2.7/site-packages/kafka/errors.py", line 496, in check_error
raise error_class(response)
kafka.errors.UnknownMemberIdError: [Error 25] UnknownMemberIdError: OffsetCommitResponsePayload(topic=u'test-topic', partition=0, error=25)
`

Hi @boniek83, in order to unsubscribe a consumer group with offsets stored in Kafka, you need to make sure none of the consumers with this consumer group id are running.

The error you are seeing seems to indicate at least of the consumers are active for this consumer group, and Kafka rejected the "unsubscribe" request. Can you confirm all of your consumers are stopped?

Yes, they weren't. It didn't seem obvious to me - it is not documented in help anywhere :)