oleksiyk/kafka

GroupConsumer : lack of some internal functionality

Closed this issue · 5 comments

I have been working a bit with the no-kafka library and bumped into few hardships related to a lack of some internal functionality in GroupConsumer class.
So, from my perspective it would be great to add next functionality to the GroupConsumer class:

1) isAvailable flag and events on it's changing:

Here I would like to suggest adding an "isAvailable" variable to GroupConsumer class and adding it's change in the _heartbeat method in the then/catch blocks of heartbeatRequest method.
Code example:

    _heartbeat () {
        var self = this;

        return self.client.heartbeatRequest(self.options.groupId, self.memberId, self.generationId)
            .then(() => {
                //=======================================//
                /**
                    In case of successful heartbeat set isAvailable flag to true
                 */
                self.isAvailable = true;
                //=======================================//
            })
            .catch({ code: 'RebalanceInProgress' }, function () {
                //=======================================//
                /**
                    In case of failed heartbeat set isAvailable flag to false
                 */
                self.isAvailable = false;
                //=======================================//
                // new group member has joined or existing member has left
                self.client.log('Rejoining group on RebalanceInProgress');
                return self._rejoin();
            })
            .tap(function () {
                self._heartbeatTimeout = setTimeout(function () {
                    self._heartbeatPromise = self._heartbeat();
                }, self.options.heartbeatTimeout);
            })
            .catch(function (err) {
                //=======================================//
                /**
                    In case of failed heartbeat set isAvailable flag to false
                 */
                self.isAvailable = false;
                //=======================================//
                // some severe error, such as GroupCoordinatorNotAvailable or network error
                // in this case we should start trying to rejoin from scratch
                self.client.error('Sending heartbeat failed: ', err);
                return self._fullRejoin().catch(function (_err) {
                    self.client.error(_err);
                });
            });
    };

2) Save of all current subscriptions while rejoining of consumer group (for now on rejoining consumer group it restores only subscriptions mentioned on consumer group creation despite subscriptions obtained via "subscribe" method)

This functionality can be obtained by accounting all current subscriptions of GroupConsumer in "_syncGroup" method
Code example:

    _syncGroup() {
        var self = this;

        return Promise.try(function () {
            if (self.memberId === self.leaderId) { // leader should generate group assignments
                return self.client.updateMetadata().then(function () {
                    var r = [];
                    _.each(self.members, function (member) {
                        //=======================================//
                        /**
                            In case of rejoining consumer group, we should consider current subscriptions
                         */
                        _.each(_.union(member.subscriptions,
                            _.uniq(Object.keys(self.subscriptions).map(key => key.split(`:`)[0]))), function (topic) {
                            r.push([topic, member]);
                        });
                        //=======================================//
                    });
                    r = _(r).groupBy(0).map(function (val, key) {
                        if (!self.client.topicMetadata[key]) {
                            self.client.error('Sync group: unknown topic:', key);
                        }
                        return {
                            topic: key,
                            members: _.map(val, 1),
                            partitions: _.map(self.client.topicMetadata[key], 'partitionId')
                        };
                    }).value();

                    return self.strategies[self.strategyName].strategy.assignment(r);
                });
            }
            return [];
        })
            .then(function (result) {
                var assignments = _(result).groupBy('memberId').mapValues(function (mv, mk) {
                    return {
                        memberId: mk,
                        memberAssignment: {
                            version: 0,
                            metadata: null,
                            partitionAssignment: _(mv).groupBy('topic').map(function (tv, tk) {
                                return {
                                    topic: tk,
                                    partitions: _.map(tv, 'partition')
                                };
                            }).value()
                        }
                    };
                }).values().value();

                // console.log(require('util').inspect(assignments, true, 10, true));
                return self.client.syncConsumerGroupRequest(self.options.groupId, self.memberId, self.generationId, assignments);
            })
            .then(function (response) {
                return self._updateSubscriptions(_.get(response, 'memberAssignment.partitionAssignment', []));
            });
    };

_3) Temporary saving previous subscription in "updateSubscriptions" method before clearing subscriptions object. It will gives us an ability not to lose topics offsets on subscriptions updating.

In the "_updateSubscriptions" method there is a clearing of subscriptions object (first of all). Then it requests metadata to fetch offsets. It's a bit danger in case if kafka server was not avaliable for a while. Offsets could not have been committed and after kafka returns to the avaliable state we can fetch old offsets.
So, to get around this we could just temporary save old subscription object before clearing it and use it on calculating topic offset.
Code example:

    _updateSubscriptions(partitionAssignment) {
        var self = this, offsetRequests = [],
            handler = self.strategies[self.strategyName].handler;
        //=======================================//
        /**
            Before clear current subscription we should store them
         */
        var previousSubscriptions = Object.assign({}, self.subscriptions);
        //=======================================//
        self.subscriptions = {};

        if (_.isEmpty(partitionAssignment)) {
            return self.client.warn('No partition assignment received');
        }

        // should probably wait for current fetch/handlers to finish before fetching offsets and re-subscribing

        _.each(partitionAssignment, function (a) {
            _.each(a.partitions, function (p) {
                offsetRequests.push({
                    topic: a.topic,
                    partition: p
                });
            });
        });

        return self.client.updateMetadata().then(function () {
            return self.fetchOffset(offsetRequests).map(function (p) {
                //=======================================//
                /**
                    Consider of stored old subscriptions with mentioned offset
                    It will give us an ability not to lose messages
                    and start consuming new messages from hte latest offset
                 */
                var options = {
                    offset: previousSubscriptions[`${p.topic}:${p.partition}`] ?
                        previousSubscriptions[`${p.topic}:${p.partition}`].offset : p.offset
                };
               //=======================================//

                if (p.error || p.offset < 0) {
                    options = {
                        time: self.options.startingOffset
                    };
                }

                return self.subscribe(p.topic, p.partition, options, handler).catch(function (err) {
                    self.client.error('Failed to subscribe to', p.topic + ':' + p.partition, err);
                });
            });
        });
    };

I have tested this improvements locally and it seems like all works fine. It started to be pretty good in case of temporary kafka fault. However, I believe that there are a better solutions than mine.

Please, let me know your thoughts.

Thanks,
Ihor T.

Why would you use subscribe with GroupConsumer? Group consumer is not intended for manual subscriptions, only the group leader should assign subscriptions based on initialisation strategies. If you need manual subscriptions - use Simple Consumer instead.

I also don't think we should be saving offsets locally during rebalance as partitions can be re-assigned to other group consumers. Having two ways to restore offsets is definitely wrong.

I also don't think we should be saving offsets locally during rebalance as partitions can be re-assigned to other group consumers. Having two ways to restore offsets is definitely wrong.

So, if Kafka falls down and my application has not time to commit last offset, after Kafka gets up again, application will fetch wrong offset (early offset) and receive some messages double time. It's how the GroupConsumer works for now for me.

So, if Kafka falls down and my application has not time to commit last offset, after Kafka gets up again, application will fetch wrong offset (early offset) and receive some messages double time. It's how the GroupConsumer works for now for me.

Yes, thats correct. It means a guarantee of "at least once".
I don't think storing offsets locally can help. What would you do with your stored offsets if partitions will be re-assigned to different brokers when Kafka gets back online?

Not actual anymore