Partition rebalance has serious issues
baconalot opened this issue · 4 comments
Given the following test program:
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/wvanbergen/kafka/consumergroup"
"gopkg.in/Shopify/sarama.v1"
)
func main() {
sarama.Logger = log.New(os.Stdout, "[Sarama]", log.LstdFlags)
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 10 * time.Second
consumer, consumerErr := consumergroup.JoinConsumerGroup("FOO_TEST_CAN_GO", []string{"testproducercango"}, []string{"127.0.0.1"}, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
defer consumer.Close()
go func() {
for err := range consumer.Errors() {
log.Println(err)
}
}()
eventCount := 0
messProcessed := make(map[string]int)
StreamLoop:
for {
select {
case <-time.After(time.Second * 10):
break StreamLoop
case mess := <-consumer.Messages():
fmt.Printf("Got event from stream. Topic: %v, Partition: %v, Offset: %v, Mess: %v \n", mess.Topic, mess.Partition, mess.Offset, string(mess.Value))
eventCount += 1
// Simulate processing time
time.Sleep(2 * time.Second)
consumer.CommitUpto(mess)
messProcessed[string(mess.Value)] += 1
}
}
log.Printf("Processed %d events.", eventCount)
log.Printf("%+v", messProcessed)
}
With topic testproducercango
having 20 messages called bladiebla1...bladiebla20
in 2 partitions.
I get the following output when I start this program 4
times in parallel.
Pid 0:
[Sarama]2015/04/07 17:37:39 Initializing new client [Sarama]2015/04/07 17:37:39 Fetching metadata for all topics from broker localhost:9092 [Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:39 Registered new broker #0 at localhost:9092 [Sarama]2015/04/07 17:37:39 Successfully initialized new client [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Consumer instance registered (me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840). [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Currently registered consumers: 1 [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Claiming 2 of 2 partitions [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Partition consumer starting at the oldest available offset. [Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Partition consumer starting at the oldest available offset. Got event from stream. Topic: testproducercango, Partition: 0, Offset: 0, Mess: bladiebla2 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 1, Mess: bladiebla3 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4 [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Stopping partition consumer at offset 6 [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Last processed offset: 1. Waiting up to 10s for another 5 messages to process... [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Stopping partition consumer at offset 12 [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Last processed offset: -1. Waiting up to 10s for another 13 messages to process... [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: -1 [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] FAILED closing the offset manager: Not all offsets were committed before shutdown was completed! [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] Deregistered consumer instance me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840. [Sarama]2015/04/07 17:37:55 Closing Client [Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092 [Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092 panic: runtime error: invalid memory address or nil pointer dereference [signal 0xb code=0x1 addr=0x0 pc=0x47a613] goroutine 1 [running]: github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x2, 0x0) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123 github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x0, 0x2, 0xc208381e00) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208041590, 0x0, 0x0) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1 main.main() /home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f goroutine 5 [semacquire]: sync.(*WaitGroup).Wait(0xc20801e140) /usr/local/go/src/sync/waitgroup.go:132 +0x169 github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0) /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d github.com/samuel/go-zookeeper/zk.func·001() /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f goroutine 7 [runnable]: github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f9731560bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0) /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9 github.com/samuel/go-zookeeper/zk.func·002() /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a created by github.com/samuel/go-zookeeper/zk.(*Conn).loop /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680 goroutine 17 [syscall, locked to thread]: runtime.goexit() /usr/local/go/src/runtime/asm_amd64.s:2232 +0x1
Pid 1:
[Sarama]2015/04/07 17:37:45 Initializing new client [Sarama]2015/04/07 17:37:45 Fetching metadata for all topics from broker localhost:9092 [Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:45 Registered new broker #0 at localhost:9092 [Sarama]2015/04/07 17:37:45 Successfully initialized new client [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Consumer instance registered (me-user:7c3f576d-063f-4399-b346-509180e0075d). [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2 [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 2. [Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 3, Mess: bladiebla8 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 4, Mess: bladiebla12 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 5, Mess: bladiebla14 [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset 6 [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Last processed offset: 4. Waiting up to 10s for another 2 messages to process... Got event from stream. Topic: testproducercango, Partition: 0, Offset: 6, Mess: bladiebla19 [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 3 [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 0 of 2 partitions [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2 [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 3. Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6 [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12 [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process... Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13 [Sarama]2015/04/07 17:38:13 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: 3 [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 1 [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 2 of 2 partitions [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 7. [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 4. Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18 [Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset -1 [Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12 2015/04/07 17:38:50 Processed 24 events. 2015/04/07 17:38:50 map[bladiebla13:2 bladiebla17:2 bladiebla6:1 bladiebla7:2 bladiebla16:2 bladiebla12:1 bladiebla8:1 bladiebla19:1 bladiebla11:2 bladiebla15:2 bladiebla4:1 bladiebla9:2 bladiebla10:2 bladiebla18:2 bladiebla14:1] [Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] Deregistered consumer instance me-user:7c3f576d-063f-4399-b346-509180e0075d. [Sarama]2015/04/07 17:38:51 Closing Client [Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092 [Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092
Pid 2:
[Sarama]2015/04/07 17:37:52 Initializing new client [Sarama]2015/04/07 17:37:52 Fetching metadata for all topics from broker localhost:9092 [Sarama]2015/04/07 17:37:52 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:52 Registered new broker #0 at localhost:9092 [Sarama]2015/04/07 17:37:52 Successfully initialized new client [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Consumer instance registered (me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7). [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3 [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/14644cac47d7] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7. [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1 [Sarama]2015/04/07 17:37:56 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3 [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7. [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1 2015/04/07 17:38:02 Processed 0 events. 2015/04/07 17:38:02 map[] [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] Deregistered consumer instance me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7. [Sarama]2015/04/07 17:38:02 Closing Client [Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092 [Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092
Pid 3:
[Sarama]2015/04/07 17:37:55 Initializing new client [Sarama]2015/04/07 17:37:55 Fetching metadata for all topics from broker localhost:9092 [Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:55 Registered new broker #0 at localhost:9092 [Sarama]2015/04/07 17:37:55 Successfully initialized new client [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Consumer instance registered (me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0). [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 3 [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Partition consumer starting at the oldest available offset. [Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 0, Mess: bladiebla0 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 1, Mess: bladiebla1 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 2, Mess: bladiebla5 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6 [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Stopping partition consumer at offset 12 [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process... [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 2 [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Partition consumer starting at offset 7. [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Stopping partition consumer at offset -1 [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] Deregistered consumer instance me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0. [Sarama]2015/04/07 17:38:03 Closing Client [Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092 [Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092 panic: runtime error: invalid memory address or nil pointer dereference [signal 0xb code=0x1 addr=0x0 pc=0x47a613] goroutine 1 [running]: github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x3, 0x0) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123 github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x1, 0x3, 0xc208381700) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208040fa0, 0x0, 0x0) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1 main.main() /home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f goroutine 5 [semacquire]: sync.(*WaitGroup).Wait(0xc20801e140) /usr/local/go/src/sync/waitgroup.go:132 +0x169 github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0) /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d github.com/samuel/go-zookeeper/zk.func·001() /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f goroutine 7 [runnable]: github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f7bc8842bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0) /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9 github.com/samuel/go-zookeeper/zk.func·002() /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a created by github.com/samuel/go-zookeeper/zk.(*Conn).loop /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680 goroutine 17 [syscall, locked to thread]: runtime.goexit() /usr/local/go/src/runtime/asm_amd64.s:2232 +0x1
Issues in bold. For now I see two:
markAsProcessed
acts on already closed partition- Messages are processed multiple times!
Can you try #72 by any chance? The way the rebalance works has been improved significantly.
hi wvanbergen
I tried to run #72 but found it only work with kafka 0.8.2 because of sarama's Client.Coordinator in interface Client.
is it necessary?
// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
// value if it's available. You can call RefreshCoordinator to update the cached value.
// This function only works on Kafka 0.8.2 and higher.
Coordinator(consumerGroup string) (*Broker, error)
Yes; the new implementation uses Kafka to commit processed offsets, which requires 0.8.2. I may add Zookeeper support but for now that is not really on my list.
If you can't upgrade to Kafka 0.8.2, make sure you're using the latest master version of consumergroup
, because a bunch of bugs were recently fixed in it's implementation which may have been the cause of this panic.
we can not upgrade to 0.8.2
but thank you for your work after all
在 2015年8月26日,17:14,Willem van Bergen notifications@github.com 写道:
Yes; the new implementation uses Kafka to commit processed offsets, which requires 0.8.2. I may add Zookeeper support but for now that is not really on my list.
If you can't upgrade to Kafka 0.8.2, make sure you're using the latest master version of consumergroup, because a bunch of bugs were recently fixed in it's implementation which may have been the cause of this panic.
—
Reply to this email directly or view it on GitHub.