specified generation id is not valid when committing offset during rebalance
niyue opened this issue · 4 comments
I used KafkaManualCommitConsumer
with a customized RebalanceCallback, and commit the offset when a partition is revoked during rebalance. Currently, when I have 3 consumers in the consumer group (joining in slightly different time), the program will report errors like below during committing offset:
due to unexpected exception with message:
2021-07-08 12:59:33.898233: Broker: Specified group generation id is not
valid [22] (/.../vcpkg/installed/x64-osx/include/kafka/KafkaConsumer.h:588)
The program is the same as the sample program (https://github.com/morganstanley/modern-cpp-kafka/blob/main/doc/KafkaConsumerQuickStart.md#kafkamanualcommitconsumer) with additional RebalanceCallback added. In the RebalanceCallback, I call consumer.commitSync
for revoke rebalance event.
After some investigation, I found in KafkaConsumer.h
, the customized rebalance callback is called after assign
/unassign
partitions. This seems incorrect from what I read in librdkafka's documentation (https://docs.confluent.io/2.0.0/clients/librdkafka/classRdKafka_1_1RebalanceCb.html#a490a91c52724382a72380af621958741):
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
// application may load offets from arbitrary external
// storage here and update \p partitions
consumer->assign(partitions);
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
// Application may commit offsets manually here
// if auto.commit.enable=false
consumer->unassign();
} else {
std::cerr << "Rebalancing error: <<
RdKafka::err2str(err) << std::endl;
}
}
It seems application callback should be called before assign
/unassign
, could you please confirm if it is a bug?
Here is the sample program I used to reproduce this issue:
static string get_env(const string &env_var, const string &default_value) {
auto value = std::getenv(env_var.c_str());
return value ? string(value) : default_value;
}
static kafka::Properties get_props() {
auto brokers = get_env("BOOTSTRAP_SERVERS", "localhost:9092");
auto group_id = get_env("GROUP_ID", "black_hole");
kafka::Properties props({{"bootstrap.servers", brokers},
{"auto.offset.reset", "earliest"},
{"group.id", group_id}});
return props;
}
static void commit_sync(KafkaManualCommitConsumer &consumer, TopicPartitionOffsets &offsets,
bool is_revocation = false) {
consumer.commitSync(offsets);
auto event = is_revocation ? "revoked partition offset committed" : "offset committed";
for (auto const &[tp, offset] : offsets) {
std::cout << "[" << event << "] topic=" << tp.first << " partition=" << tp.second
<< " offset=" << offset << " thread=" << std::this_thread::get_id() << std::endl;
}
}
static void start_pulling(uint32_t total_records) {
auto props = get_props();
KafkaManualCommitConsumer consumer(props);
std::cout << "[starting black hole] props=" << props.toString() << std::endl;
auto topic_pattern = get_env("TOPIC_PATTERN", "foo");
std::cout << "[subscribed topics] topic_pattern=" << topic_pattern << std::endl;
uint32_t record_count = 0;
TopicPartitionOffsets offsets;
RebalanceCallback rebalance_callback = [&consumer,
&offsets](RebalanceEventType event_type,
const TopicPartitions &topic_partitions) {
bool is_revocation = event_type == RebalanceEventType::PartitionsRevoked;
for (auto const &tp : topic_partitions) {
std::cout << "[" << (is_revocation ? "revoked" : "assigned")
<< " partition] topic=" << tp.first << " partition=" << tp.second << std::endl;
}
if (is_revocation) {
commit_sync(consumer, offsets, true);
}
};
bool use_callback = get_env("USE_CB", "true") == "true";
if (use_callback) {
consumer.subscribe({topic_pattern}, rebalance_callback);
std::cout << "[subscribe with rebalance callback]" << std::endl;
} else {
consumer.subscribe({topic_pattern});
std::cout << "[subscribe with no rebalance callback]" << std::endl;
}
while (record_count <= total_records) {
auto records = consumer.poll(std::chrono::milliseconds(100));
if (records.size() == 1 && records[0].error()) {
std::cout << "error during consumption: " << records[0].error().message() << std::endl;
} else {
std::cout << "[records retrieved] records=" << records.size() << std::endl;
if (records.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
for (auto const &record : records) {
TopicPartition tp(record.topic(), record.partition());
offsets[tp] = record.offset() + 1;
record_count++;
// commit for every 100 records
if (record_count % 100 == 0) {
commit_sync(consumer, offsets);
}
}
}
}
}
std::cout << "exit" << std::endl;
}
I submit a PR trying to fix this issue, and I verified this fix locally with this change.
Please let me know if this helps or if there is anything I need to change. Thanks.
I submit a PR trying to fix this issue, and I verified this fix locally with this change.
Please let me know if this helps or if there is anything I need to change. Thanks.
Thank you for submitting the PR. I commented a bit, please help to update the PR -- just need a slight change before merging it in. Cheers!
Thanks for the comments, I updated the PR, please check it out~