wvanbergen/kafka

Panic on "CommitUpto" after rebalance

nemosupremo opened this issue · 1 comments

Reproduction Steps:

  1. A worker starts a task
  2. A rebalance occurs
  3. The worker finished the task and commits

Stack dump:

[2015-09-18T20:21:24.991Z] [GEARD] [geard.go:50] [INFO] PANIC: runtime error: invalid memory address or nil pointer dereference
/usr/src/go/src/runtime/panic.go:387 (0x41a9a8)
    gopanic: reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
/usr/src/go/src/runtime/panic.go:42 (0x419cce)
    panicmem: panic(memoryError)
/usr/src/go/src/runtime/sigpanic_unix.go:26 (0x420734)
    sigpanic: panicmem()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:223 (0x759ef3)
    (*partitionOffsetTracker).markAsProcessed: pot.l.Lock()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:152 (0x75942c)
    (*zookeeperOffsetManager).MarkAsProcessed: return zom.offsets[topic][partition].markAsProcessed(offset)
/go/src/github.com/channelmeter/kafka/consumergroup/consumer_group.go:235 (0x755814)
    (*ConsumerGroup).CommitUpto: cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
/go/src/github.com/channelmeter/geard/harvester/kafka.go:83 (0x475b75)
    (*kafkaHarvester).Ack: return rq.consumer.CommitUpto(msg)
/go/src/github.com/channelmeter/geard/geard.go:410 (0x4063e2)
    func.012: harvestQueue.Ack(harvTask)
/usr/src/go/src/runtime/asm_amd64.s:401 (0x443aa5)
    call16: CALLFN(·call16, 16)
/usr/src/go/src/runtime/panic.go:387 (0x41a9a8)
    gopanic: reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
/usr/src/go/src/runtime/panic.go:42 (0x419cce)
    panicmem: panic(memoryError)
/usr/src/go/src/runtime/sigpanic_unix.go:26 (0x420734)
    sigpanic: panicmem()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:223 (0x759ef3)
    (*partitionOffsetTracker).markAsProcessed: pot.l.Lock()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:152 (0x75942c)
    (*zookeeperOffsetManager).MarkAsProcessed: return zom.offsets[topic][partition].markAsProcessed(offset)
/go/src/github.com/channelmeter/kafka/consumergroup/consumer_group.go:235 (0x755814)
    (*ConsumerGroup).CommitUpto: cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
/go/src/github.com/channelmeter/geard/harvester/kafka.go:83 (0x475b75)
    (*kafkaHarvester).Ack: return rq.consumer.CommitUpto(msg)

I'm guessing here, the consumer can no longer Commit the selected message because it no longer owns the partition. What should be done in this case? It might make most sense to drop the commit seeing how the message will be reprocessed anyways, or somehow let the application know that a rebalance occurred and your messages don't matter.

In theory we should wait for ProcessingTimeout before giving up the partition claim (see https://github.com/wvanbergen/kafka/blob/master/consumergroup/offset_manager.go#L134-L135).

Hmmm; do you have logging turned on? Do you see that timeout message in your logs?