bsm/sarama-cluster

Delivering disordered messages during shutdown(rebalance), causing message loss

longquanzheng opened this issue · 19 comments

We are experiencing msg loss during restarting workers, it happens consistently.
But when I added these logs, the message loss disappear.

longquanzheng@0c61134

I think this is because the logging slow down CommitOffsets() function. There must be some race condition with it.
Do you have any idea?

dim commented

@longquanzheng I am currently working on implementing cluster functionality into sarama directly - IBM/sarama#1099. Only a few small bits are missing at this point and I aim to finish those next week.

dim commented

@longquanzheng I am nevertheless happy to fix the race if you can identify how it happens. We have quite a "fuzzy" test in our suite (see https://github.com/bsm/sarama-cluster/blob/master/consumer_test.go#L284) which seems to pass without losing any messages.

@dim I found that sarama-cluster would deliver messages out of order starting from "Close()" is called to shutdown the library.

Here is an example: worker A owns the partition, consuming 1,2,3,...,9,10, everything is good, and then we call Close() to shutdown, it starts to receive lots of misordered messages, for example it got 100 (jump over 11 ~~ 99).
If worker A commit any of these, then worker B will have to start from 101, which means we have to miss 11~99. However if worker A doesn't commit 100, then worker B can still continue from 11, then we won't miss any messages.

To mitigate this issue, I enforce our consuming process to sleep for 2 seconds before shutdown sarama-cluster, and it works.(uber/cadence@ec218d7 )

But this is not a final solution. We want to understand why sarama-cluster starts to deliver out of order when we call(Close)

Note that this issue only repro in our production when hosts are busy, where we run lots of processes concurrently. We are not able to reproduce it in laptop or some idle hardware.

dim commented

@longquanzheng from what I know, messages are only ordered per topic/partition. also, Close() doesn't do anything specific i.e. we are just closing the consumer and waiting for exit. finally, the test I have mentioned above is testing this case, i.e. it starts consumers/shuts them down/starts new ones and no messages are being missed, neither locally nor on the CI. I am not sure how to debug this as this doesn't seem to happen neither to us, nor has it been reported by someone else before.

@dim we finally found that the bug is here: georgeteo/sarama@9618a79

Actually, the issue is with the sarama/sarama-cluster contract. Specifically, in https://github.com/bsm/sarama-cluster/blob/master/partitions.go#L89, you use partitonConsumer.Close(), which will drain the Messages channel.

We use the PartitionConsumer abstraction from sarama-cluster, and on Close or rebalance, we continue reading from the Messages channel, but because Sarama is also reading from that channel, we end up with holes in our stream (e.g., we get msg 100, sarama drain takes msg 101, etc). Then due to a race condition in the shutdown procedure, we may commit msg 102 and lose messages.

The proposed fix would be to use AsyncClose in sarama-cluster partitions.go:89.

This is the PR with the fix for this: #258.

Thanks for accepting the PR. Can you tag a new release as well?

dim commented

@georgeteo looks like you didn't run the tests 😄

I've tried #258 fix today and reverted back.
Looks like something is wrong, consumer is not consuming messages
when listening to Notifications channel, I receive infinite rebalance error messages

By the way #258 worked for me with kafkaConfig.Group.Mode = cluster.ConsumerModePartitions it was only in the default mode that crashed for me

@danpmx, @jpiper, I'm unable to reproduce the crash. Can you post your consumer configuration?

When running the following non partition consumer code, I don't see either crashing or infinite rebalance:

func main() {
	config := cluster.NewConfig()
	config.Group.Return.Notifications = true
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, "cg", []string{"test01"}, config)
	if err != nil {
		log.Fatal("unable to start consumer", err)
	}

	for {
		select {
		case msg, ok := <- consumer.Messages():
			if !ok {
				log.Println("ERROR: message channel is closed")
				continue
			}
			log.Printf("INFO: received offset %d from %s-%d\n", msg.Offset, msg.Topic, msg.Partition)
		case err, ok := <- consumer.Errors():
			if !ok {
				log.Println("ERROR: consumer channel is closed")
				continue
			}
			log.Printf("INFO: received error %s from sarama-cluster", err.Error())
		case ntf, ok := <- consumer.Notifications():
			if !ok {
				log.Println("ERROR: notification channel is closed")
				continue
			}
			log.Printf("INFO: received notification %s from sarama-cluster", ntf.Type.String())
		}
	}
}

Two consumer workers:

Worker 1

 ./sarama-test
2018/08/17 10:22:43 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:22:47 INFO: received notification rebalance OK from sarama-cluster
2018/08/17 10:22:48 INFO: received offset 19 from test01-0
2018/08/17 10:22:48 INFO: received offset 20 from test01-0
2018/08/17 10:22:48 INFO: received offset 21 from test01-0
# Worker 2 join here
2018/08/17 10:22:51 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:22:51 INFO: received notification rebalance OK from sarama-cluster
2018/08/17 10:23:00 INFO: received offset 22 from test01-0
2018/08/17 10:23:02 INFO: received offset 23 from test01-0
2018/08/17 10:23:03 INFO: received offset 24 from test01-0
2018/08/17 10:23:04 INFO: received offset 25 from test01-0
# Worker 2 leave here. 
2018/08/17 10:23:12 INFO: received notification rebalance start from sarama-cluster
# Worker 1 leaves. 

Worker 2

# worker 1 leaves
./sarama-test
2018/08/17 10:23:10 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:24:06 INFO: received notification rebalance error from sarama-cluster
2018/08/17 10:24:06 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:24:09 INFO: received notification rebalance OK from sarama-cluster
2018/08/17 10:24:09 INFO: received offset 26 from test01-0
2018/08/17 10:24:09 INFO: received offset 27 from test01-0
2018/08/17 10:24:09 INFO: received offset 28 from test01-0
2018/08/17 10:24:09 INFO: received offset 29 from test01-0
2018/08/17 10:24:27 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:24:27 INFO: received notification rebalance OK from sarama-cluster

@dim: do you have any clues why non partition consumer might be broken with my recent change?

@georgeteo I’m using this config

kafkaConfig := cluster.NewConfig()
sarama.MaxResponseSize = 104857600
sarama.MaxRequestSize = 104857600

kafkaConfig.Version = sarama.V1_1_0_0 // Sarama will default to 0.8
kafkaConfig.Group.PartitionStrategy = cluster.StrategyRoundRobin
kafkaConfig.Consumer.Return.Errors = true
kafkaConfig.Group.Return.Notifications = true
kafkaConfig.ChannelBufferSize = 1000

I am able to repro behavior identical to what @danpmx reported. I digged into this and the root cause appears to be a deadlock in the underlying sarama library (which existed even before the fix added by george). But the new fix caused this deadlock to manifest itself differently i.e. after the fix, its a bunch of rebalance errors; before the fix, the deadlock will lead to consumer not receiving any messages at all. Following is the potential bug I discovered:

  • Setup

    • Topic with two partitions
    • Each partition has about 10k messages
    • partition ChannelBufferSize=32, dwellTimer=1s and maxProcessingTime=250ms
    • Consumer started in multiplexed mode; sleeps 1millis after processing each message. Initially sleeps for a second before consuming the first message
    • I trigger rebalances every 3-4s or so
  • Deadlock

    • Both partition consumers are started by the sarama library
    • Partition consumer blocks for a while and abandons subscription link_1 link_2 because upstream is slow
    • While its in this abandoned state, a rebalance is triggered
    • As part of rebalance, saramacluster.consumer.nextTick() calls release() to release all subs
    • Prior to the fix added by george (current master), this release() call will block forever
    • After the fix added by george, Close() is replaced by AsyncClose(), so release() will finish, but then nextTick.subscribe() will fail with this error. This goes on in a loop and will result in infinite rebalance errors

So, I see two issues now:

  • the message loss originally reported by this issue
  • the deadlock which exist even before this fix

Whoever Owns Sarama: Verify if my analysis above is valid and update this ticket.

@georgeteo our configuration:

conf := cluster.NewConfig()
conf.ChannelBufferSize = 1000
conf.Consumer.Return.Errors = true
conf.Group.Return.Notifications = true
conf.Group.Mode = cluster.ConsumerModePartitions

@venkat1109 The release() call will not block, because sarama.PartitionConsumer.Close() starts a goroutine to drain the messages channel. This is why a rebalance will lead to data loss.

rebalance -> release -> PartitionConsumer.Close() -> drain messages -> data loss

@imjustfly - sarama.PartitionConsumer.Close() will block because of this for range loop (and not because of drain, which runs in its own goroutine):
https://github.com/Shopify/sarama/blob/master/consumer.go#L431

i.e. sarama.PartitionConsumer.Close() will return only after child.errors channel is closed.

@venkat1109 child.errors is closed:

Close() -> AyncClose() -> close(dying) -> close(trigger) -> close(feeder) -> close(errors)

@imjustfly please see the sequence of steps I described above.

  • You are right that AsyncClose() code path is supposed to close feeder
  • But when upstream is slow, responseFeeder routine will block here (because of expiryTicker) so, feeder will never get closed