SOHU-Co/kafka-node

Upgrading Kafka cluster triggers brokersChanged error and doesnt self-heal

bmxpiku opened this issue · 4 comments

Bug Report

We have upgraded our kafka from version from 2.4.1 to 2.4.1.1 and it triggered

Kafka event brokersChanged // my log message from config below L56
// kafka log below 
{
    "stack": "Error: NotLeaderForPartition\n    at Object.<anonymous> (/app/node_modules/kafka-node/lib/protocol/protocol.js:952:17)\n    at Object.self.tap (/app/node_modules/binary/index.js:248:12)\n    at Object.decodePartitions (/app/node_modules/kafka-node/lib/protocol/protocol.js:950:95)\n    at Object.self.loop (/app/node_modules/binary/index.js:267:16)\n    at Object.<anonymous> (/app/node_modules/kafka-node/lib/protocol/protocol.js:165:8)\n    at Object.self.loop (/app/node_modules/binary/index.js:267:16)\n    at decodeProduceV2Response (/app/node_modules/kafka-node/lib/protocol/protocol.js:944:6)\n    at KafkaClient.Client.invokeResponseCallback (/app/node_modules/kafka-node/lib/baseClient.js:589:18)\n    at KafkaClient.Client.handleReceivedData (/app/node_modules/kafka-node/lib/baseClient.js:569:10)\n    at KafkaClient.handleReceivedData (/app/node_modules/kafka-node/lib/kafkaClient.js:1378:48)",
    "message": "NotLeaderForPartition"
}
Kafka client ready // also my log message 

I was expecting this upgrade would self-heal with version 5.0.0 of kafka-node, but maybe I am missing something in config.

It "partially" did fixed itself but it still throws not for every but for some messages

{
    "stack": "Error\n    at Object.<anonymous> (/app/node_modules/kafka-node/lib/protocol/protocol.js:928:17)\n    at Object.self.tap (/app/node_modules/binary/index.js:248:12)\n    at Object.decodePartitions (/app/node_modules/kafka-node/lib/protocol/protocol.js:926:73)\n    at Object.self.loop (/app/node_modules/binary/index.js:267:16)\n    at Object.<anonymous> (/app/node_modules/kafka-node/lib/protocol/protocol.js:165:8)\n    at Object.self.loop (/app/node_modules/binary/index.js:267:16)\n    at decodeProduceV1Response (/app/node_modules/kafka-node/lib/protocol/protocol.js:920:6)\n    at KafkaClient.Client.invokeResponseCallback (/app/node_modules/kafka-node/lib/baseClient.js:589:18)\n    at KafkaClient.Client.handleReceivedData (/app/node_modules/kafka-node/lib/baseClient.js:569:10)\n    at KafkaClient.handleReceivedData (/app/node_modules/kafka-node/lib/kafkaClient.js:1378:48)"
}

I cannot run production upgrade without being certain of keeping services alive. Although - I have tested that rebooting container on my testing env is fixing - fully reconnecting kafka - the problem.
https://github.com/BerlingskeMedia/aria_datadispatcher/blob/master/server/kafka.js
I have hard time recreating this problem, setting up kafka container locally so I cannot provide example - beside above configuration.

Environment

  • Node version: 12.13.0
  • Kafka-node version: 5.0.0

For specific cases also provide

We have Kafka 2.2.1, 2.3.1 and 2.4.1.1 (this cluster being 2.4.1 => 2.4.1.1). The topic in question have 9 partitions across 3 brokers

After on("ready") when you send, try something like this.....

    try {
        m_ProducerClient.send(payloads, function (err, data){
            if (err) {
                logger.error("Error while sending messages: " + err.stack);
                m_ReadyFlag = false;
                m_ProducerClient.emit("error");
            }
            else {
               logger.debug(`message sent to kafka: ${m_ProducerClient.client.options.kafkaHost}`);
           }
        });
    } catch (err) {
        logger.error(`Connection Error while sending message to ${m_ProducerClient.client.options.kafkaHost}`);
        m_ReadyFlag = false;
        m_ProducerClient.client.close();
        m_ProducerClient.emit("error");
    }

@bmxpiku did you find any fix for that issue? I have the same issue.
I tried "client.refreshBrokerMetadata()" but it is not fix the error.

@shlomi-ozon I actually ended up switching to kafkajs, which works like a charm tbh

Thanks @bmxpiku, I think I will do that too.