bsm/sarama-cluster

commitOffset does not work properly in partition mode

carl-leopard opened this issue · 4 comments

kafka: 1.1.0
zookeeper: 3.4.12

i call MarkOffset after handling message success, then call CommitOffset to manual commit offset, but it does not work as expected.
here is the code:

func (p *KafkaConsumer) consumeByPartitionedModeAndCommit(handler func(message interface{}) error) {
	// consume partitions
	for {
		select {
		case part, ok := <-p.consumer.Partitions():
			{
				if !ok {
					log.Errorf("%s consumer Error. part:%v, ok:%v", part, ok)
					continue
				}

				// start a separate goroutine to consume messages
				go func(pc cluster.PartitionConsumer) {
					for msg := range pc.Messages() {
						log.Infof("%s comsume message. topic:%s, partition:%d, offset:%d, key:%s, val:%s", prefix, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
						if handler != nil {
							if err := handler(*msg); err == nil {
								p.consumer.MarkOffset(msg, "handled") // mark message as processed
								if err := p.consumer.CommitOffsets(); err != nil {
									log.Errorf("%s consumer commit offsets Error: %s, topic:%s, partition:%d, offset:%d", prefix, err.Error(), msg.Topic, msg.Partition, msg.Offset)
								} else {
									log.Infof("%s consumer.CommitOffsets success. topic:%s, partition:%d, offset:%d", prefix, msg.Topic, msg.Partition, msg.Offset)
								}
							} else {
								log.Errorf("%s handler message(topic:%s, partition:%d, offset:%d, key:%s, val:%s) err:%s", prefix, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value, err.Error())
							}
						} else {
							log.Infof("%s message handle is nil, so message is ignored.", prefix)
							p.consumer.MarkOffset(msg, "handled") // mark message as processed
							if err := p.consumer.CommitOffsets(); err != nil {
								log.Errorf("%s consumer commit offsets Error: %s, topic:%s, partition:%d, offset:%d", prefix, err.Error(), msg.Topic, msg.Partition, msg.Offset)
							} else {
								log.Infof("%s consumer.CommitOffsets success. topic:%s, partition:%d, offset:%d", prefix, msg.Topic, msg.Partition, msg.Offset)
							}
						}
					}
				}(part)
			}
		default:
			time.Sleep(time.Millisecond * 50)
		}

	}
}

from log file, it indicates that offset does commit successfully, but after a day or few days later, i receive the same message that has been handled successfully days ago.

i have set the auto-commit-interval to a very large number that you can think it will never comes to

thanks