I call removeTopics on a consumer, and then add back that topic, I then receive duplicate messages
davesargrad opened this issue · 0 comments
My code is simple
Here I initialize client:
const init = (broker_address) => {
// console.log("stream_from_server conn: connecting stream ");
// console.log("stream_from_server init: connecting to broker ", broker_address);
const client = new kafka.KafkaClient(broker_address);
const offset = new kafka.Offset(client)
const consumer = new kafka.Consumer(client, [ ], { autoCommit: false, fromOffset: true });
return {client: client, offset: offset, consumer: consumer};
}
I use this to join a topic:
const join = (client_offset_consumer, topic, on_msg_cb) => {
console.log("stream_from_server join: ", topic);
client_offset_consumer.offset.fetchLatestOffsets([topic], (err, offsets) => {
if (err) {
console.log(`error fetching latest offsets ${err}`)
return
}
var latest = 1
Object.keys(offsets[topic]).forEach( o => {
latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
})
client_offset_consumer.consumer.addTopics([ { topic: topic, partition: 0, offset: latest } ], (err, added) => {
console.log("stream_from_server joined: ", added);
console.log("stream_from_server joined consumer payloads: ", client_offset_consumer.consumer.payloads);
client_offset_consumer.consumer.on('message', (message) => {
console.log("my message from kafka ", "offset - ", message.offset, "highwateroffset - ", message.highWaterOffset);
on_msg_cb(message);
});
}, true);
})
}
I use this to leave a topic:
const leave = (client_offset_consumer, topic) => {
console.log("stream_from_server leave: ", topic);
client_offset_consumer.consumer.removeTopics([ topic ], (err, removed) => {
console.log("stream_from_server - left: ", removed);
});
}
In the following experiment I have a topic named "avl-vam". I join it, leave it, join a second non-existing topic "avl-vama", leave that topic, then join the original topic again.
In the following diagnostic you see that I receive three messages (offsets: 865, 866, 867) on the topic before leaving it.
I then leave that first topic, and join then leave a second topic "avl-vama".
I then rejoin the original topic "avl-vam".
Then the very next message that arrives (offset 868) is seen in triplicate!
Each subsequent message is also seen in triplicate.
If I leave the topic again (call removeTopics), then join a second non-existing topic "avl-vamb", leave that, then rejoin the original topic, then I see five messages at the same offset.
This seems like a bug. Please advise.