Upgrading Kafka cluster triggers brokersChanged error and doesnt self-heal
bmxpiku opened this issue · 4 comments
Bug Report
We have upgraded our kafka from version from 2.4.1 to 2.4.1.1 and it triggered
Kafka event brokersChanged // my log message from config below L56
// kafka log below
{
"stack": "Error: NotLeaderForPartition\n at Object.<anonymous> (/app/node_modules/kafka-node/lib/protocol/protocol.js:952:17)\n at Object.self.tap (/app/node_modules/binary/index.js:248:12)\n at Object.decodePartitions (/app/node_modules/kafka-node/lib/protocol/protocol.js:950:95)\n at Object.self.loop (/app/node_modules/binary/index.js:267:16)\n at Object.<anonymous> (/app/node_modules/kafka-node/lib/protocol/protocol.js:165:8)\n at Object.self.loop (/app/node_modules/binary/index.js:267:16)\n at decodeProduceV2Response (/app/node_modules/kafka-node/lib/protocol/protocol.js:944:6)\n at KafkaClient.Client.invokeResponseCallback (/app/node_modules/kafka-node/lib/baseClient.js:589:18)\n at KafkaClient.Client.handleReceivedData (/app/node_modules/kafka-node/lib/baseClient.js:569:10)\n at KafkaClient.handleReceivedData (/app/node_modules/kafka-node/lib/kafkaClient.js:1378:48)",
"message": "NotLeaderForPartition"
}
Kafka client ready // also my log message
I was expecting this upgrade would self-heal with version 5.0.0 of kafka-node, but maybe I am missing something in config.
It "partially" did fixed itself but it still throws not for every but for some messages
{
"stack": "Error\n at Object.<anonymous> (/app/node_modules/kafka-node/lib/protocol/protocol.js:928:17)\n at Object.self.tap (/app/node_modules/binary/index.js:248:12)\n at Object.decodePartitions (/app/node_modules/kafka-node/lib/protocol/protocol.js:926:73)\n at Object.self.loop (/app/node_modules/binary/index.js:267:16)\n at Object.<anonymous> (/app/node_modules/kafka-node/lib/protocol/protocol.js:165:8)\n at Object.self.loop (/app/node_modules/binary/index.js:267:16)\n at decodeProduceV1Response (/app/node_modules/kafka-node/lib/protocol/protocol.js:920:6)\n at KafkaClient.Client.invokeResponseCallback (/app/node_modules/kafka-node/lib/baseClient.js:589:18)\n at KafkaClient.Client.handleReceivedData (/app/node_modules/kafka-node/lib/baseClient.js:569:10)\n at KafkaClient.handleReceivedData (/app/node_modules/kafka-node/lib/kafkaClient.js:1378:48)"
}
I cannot run production upgrade without being certain of keeping services alive. Although - I have tested that rebooting container on my testing env is fixing - fully reconnecting kafka - the problem.
https://github.com/BerlingskeMedia/aria_datadispatcher/blob/master/server/kafka.js
I have hard time recreating this problem, setting up kafka container locally so I cannot provide example - beside above configuration.
Environment
- Node version: 12.13.0
- Kafka-node version: 5.0.0
For specific cases also provide
We have Kafka 2.2.1, 2.3.1 and 2.4.1.1 (this cluster being 2.4.1 => 2.4.1.1). The topic in question have 9 partitions across 3 brokers
After on("ready") when you send, try something like this.....
try {
m_ProducerClient.send(payloads, function (err, data){
if (err) {
logger.error("Error while sending messages: " + err.stack);
m_ReadyFlag = false;
m_ProducerClient.emit("error");
}
else {
logger.debug(`message sent to kafka: ${m_ProducerClient.client.options.kafkaHost}`);
}
});
} catch (err) {
logger.error(`Connection Error while sending message to ${m_ProducerClient.client.options.kafkaHost}`);
m_ReadyFlag = false;
m_ProducerClient.client.close();
m_ProducerClient.emit("error");
}
@bmxpiku did you find any fix for that issue? I have the same issue.
I tried "client.refreshBrokerMetadata()" but it is not fix the error.
@shlomi-ozon I actually ended up switching to kafkajs, which works like a charm tbh
Thanks @bmxpiku, I think I will do that too.