bsm/sarama-cluster

Upon closed dead subscription, Resubscribe

steve-gray opened this issue · 1 comments

It appears that when a partition suffers from the conditions that cause it to die, potentially as a result of either slow consumption or something else - there's conditions that lead to this code in Sarama being hit:


func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
	for _, child := range newSubscriptions {
		bc.subscriptions[child] = none{}
		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
	}

	for child := range bc.subscriptions {
		select {
		case <-child.dying:
			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
			close(child.trigger)
			delete(bc.subscriptions, child)
		default:
			break
		}
	}
}

(From consumer.go in Sarama)

Upon this dead subscription, there seems to be no consumer-group level hook to periodically check for this condition - so as long as the process remains alive it'll hold it's own lease on the partition and stop another process claiming it.

dim commented

@steve-gray I am currently working on a PR to integrate consumer groups into sarama itself IBM/sarama#1099. As part of the new API, you should be able to address these issues in your implementation, i.e. trigger a rebalance by existing a handler when a partition is stuck/slow.