Panic on "CommitUpto" after rebalance
nemosupremo opened this issue · 1 comments
nemosupremo commented
Reproduction Steps:
- A worker starts a task
- A rebalance occurs
- The worker finished the task and commits
Stack dump:
[2015-09-18T20:21:24.991Z] [GEARD] [geard.go:50] [INFO] PANIC: runtime error: invalid memory address or nil pointer dereference
/usr/src/go/src/runtime/panic.go:387 (0x41a9a8)
gopanic: reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
/usr/src/go/src/runtime/panic.go:42 (0x419cce)
panicmem: panic(memoryError)
/usr/src/go/src/runtime/sigpanic_unix.go:26 (0x420734)
sigpanic: panicmem()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:223 (0x759ef3)
(*partitionOffsetTracker).markAsProcessed: pot.l.Lock()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:152 (0x75942c)
(*zookeeperOffsetManager).MarkAsProcessed: return zom.offsets[topic][partition].markAsProcessed(offset)
/go/src/github.com/channelmeter/kafka/consumergroup/consumer_group.go:235 (0x755814)
(*ConsumerGroup).CommitUpto: cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
/go/src/github.com/channelmeter/geard/harvester/kafka.go:83 (0x475b75)
(*kafkaHarvester).Ack: return rq.consumer.CommitUpto(msg)
/go/src/github.com/channelmeter/geard/geard.go:410 (0x4063e2)
func.012: harvestQueue.Ack(harvTask)
/usr/src/go/src/runtime/asm_amd64.s:401 (0x443aa5)
call16: CALLFN(·call16, 16)
/usr/src/go/src/runtime/panic.go:387 (0x41a9a8)
gopanic: reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
/usr/src/go/src/runtime/panic.go:42 (0x419cce)
panicmem: panic(memoryError)
/usr/src/go/src/runtime/sigpanic_unix.go:26 (0x420734)
sigpanic: panicmem()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:223 (0x759ef3)
(*partitionOffsetTracker).markAsProcessed: pot.l.Lock()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:152 (0x75942c)
(*zookeeperOffsetManager).MarkAsProcessed: return zom.offsets[topic][partition].markAsProcessed(offset)
/go/src/github.com/channelmeter/kafka/consumergroup/consumer_group.go:235 (0x755814)
(*ConsumerGroup).CommitUpto: cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
/go/src/github.com/channelmeter/geard/harvester/kafka.go:83 (0x475b75)
(*kafkaHarvester).Ack: return rq.consumer.CommitUpto(msg)
I'm guessing here, the consumer can no longer Commit the selected message because it no longer owns the partition. What should be done in this case? It might make most sense to drop the commit seeing how the message will be reprocessed anyways, or somehow let the application know that a rebalance occurred and your messages don't matter.
wvanbergen commented
In theory we should wait for ProcessingTimeout before giving up the partition claim (see https://github.com/wvanbergen/kafka/blob/master/consumergroup/offset_manager.go#L134-L135).
Hmmm; do you have logging turned on? Do you see that timeout message in your logs?