Failure on altering offsets on AdminClientExample/src/main/java/org/example/AdminClientExample.java
ZviMints opened this issue · 4 comments
ZviMints commented
I'm using the AdminClientExample in order to reset offset for a specific topics as follows:
I'm trying to reset offsets for specific group id as follows:
// Pause Connectors
Await.result(k8sService.setPauseToConnectorsAsync(true), Duration.Inf)
// Get Current Offsets
val originalState = kafkaService.getCurrentState()
// Reset Offsets
kafkaService.resetOffsets(originalState)
when resetOffsets
is as follows:
def resetOffsets(originalState: Map[GroupID, mutable.Map[TopicPartition, OffsetAndMetadata]]): Unit = {
logger.info(s"resetOffsets(originalState = $originalState) Triggered")
for ((groupId, partitionsToOffsets) <- originalState) {
logger.info(s"resetOffsets($groupId) Triggered")
client.alterConsumerGroupOffsets(groupId, partitionsToOffsets.view.mapValues(_ => new OffsetAndMetadata(0)).toMap.asJava).all().get(config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)
}
}
But its fails with
org.apache.kafka.common.errors.UnknownMemberIdException: Failed altering consumer group offsets for the following partitions
Any ideas how to solve it? or how to reset the offsets for specific group.id with other way?
HunterSherms commented
@ZviMints what was the solution?
henrik242 commented
@HunterSherms @ZviMints I wonder about the same thing. Did you figure it out?
nightscape commented
Same issue...
henrik242 commented
There's probably an active consumer running, at least that was my issue. More info at https://stackoverflow.com/a/77011755/13365