morganstanley/modern-cpp-kafka

specified generation id is not valid when committing offset during rebalance

niyue opened this issue · 4 comments

niyue commented

I used KafkaManualCommitConsumer with a customized RebalanceCallback, and commit the offset when a partition is revoked during rebalance. Currently, when I have 3 consumers in the consumer group (joining in slightly different time), the program will report errors like below during committing offset:

due to unexpected exception with message:
  2021-07-08 12:59:33.898233: Broker: Specified group generation id is not
  valid [22] (/.../vcpkg/installed/x64-osx/include/kafka/KafkaConsumer.h:588)

The program is the same as the sample program (https://github.com/morganstanley/modern-cpp-kafka/blob/main/doc/KafkaConsumerQuickStart.md#kafkamanualcommitconsumer) with additional RebalanceCallback added. In the RebalanceCallback, I call consumer.commitSync for revoke rebalance event.

After some investigation, I found in KafkaConsumer.h, the customized rebalance callback is called after assign/unassign partitions. This seems incorrect from what I read in librdkafka's documentation (https://docs.confluent.io/2.0.0/clients/librdkafka/classRdKafka_1_1RebalanceCb.html#a490a91c52724382a72380af621958741):

void rebalance_cb (RdKafka::KafkaConsumer *consumer,
             RdKafka::ErrorCode err,
                std::vector<RdKafka::TopicPartition*> &partitions) {
       if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
         // application may load offets from arbitrary external
         // storage here and update \p partitions
         consumer->assign(partitions);
       } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
         // Application may commit offsets manually here
         // if auto.commit.enable=false
         consumer->unassign();
       } else {
         std::cerr << "Rebalancing error: <<
                      RdKafka::err2str(err) << std::endl;
       }
   }

It seems application callback should be called before assign/unassign, could you please confirm if it is a bug?

niyue commented

Here is the sample program I used to reproduce this issue:

static string get_env(const string &env_var, const string &default_value) {
  auto value = std::getenv(env_var.c_str());
  return value ? string(value) : default_value;
}

static kafka::Properties get_props() {
  auto brokers = get_env("BOOTSTRAP_SERVERS", "localhost:9092");
  auto group_id = get_env("GROUP_ID", "black_hole");
  kafka::Properties props({{"bootstrap.servers", brokers},
                           {"auto.offset.reset", "earliest"},
                           {"group.id", group_id}});
  return props;
}

static void commit_sync(KafkaManualCommitConsumer &consumer, TopicPartitionOffsets &offsets,
                        bool is_revocation = false) {
  consumer.commitSync(offsets);
  auto event = is_revocation ? "revoked partition offset committed" : "offset committed";
  for (auto const &[tp, offset] : offsets) {
    std::cout << "[" << event << "] topic=" << tp.first << " partition=" << tp.second
              << " offset=" << offset << " thread=" << std::this_thread::get_id() << std::endl;
  }
}

static void start_pulling(uint32_t total_records) {
  auto props = get_props();
  KafkaManualCommitConsumer consumer(props);
  std::cout << "[starting black hole] props=" << props.toString() << std::endl;
  auto topic_pattern = get_env("TOPIC_PATTERN", "foo");
  std::cout << "[subscribed topics] topic_pattern=" << topic_pattern << std::endl;

  uint32_t record_count = 0;

  TopicPartitionOffsets offsets;
  RebalanceCallback rebalance_callback = [&consumer,
                                          &offsets](RebalanceEventType event_type,
                                                    const TopicPartitions &topic_partitions) {
    bool is_revocation = event_type == RebalanceEventType::PartitionsRevoked;
    for (auto const &tp : topic_partitions) {
      std::cout << "[" << (is_revocation ? "revoked" : "assigned")
                << " partition] topic=" << tp.first << " partition=" << tp.second << std::endl;
    }
    if (is_revocation) {
      commit_sync(consumer, offsets, true);
    }
  };

  bool use_callback = get_env("USE_CB", "true") == "true";
  if (use_callback) {
    consumer.subscribe({topic_pattern}, rebalance_callback);
    std::cout << "[subscribe with rebalance callback]" << std::endl;
  } else {
    consumer.subscribe({topic_pattern});
    std::cout << "[subscribe with no rebalance callback]" << std::endl;
  }

  while (record_count <= total_records) {
    auto records = consumer.poll(std::chrono::milliseconds(100));
    if (records.size() == 1 && records[0].error()) {
      std::cout << "error during consumption: " << records[0].error().message() << std::endl;
    } else {
      std::cout << "[records retrieved] records=" << records.size() << std::endl;
      if (records.empty()) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
      } else {
        for (auto const &record : records) {
          TopicPartition tp(record.topic(), record.partition());
          offsets[tp] = record.offset() + 1;

          record_count++;
          // commit for every 100 records
          if (record_count % 100 == 0) {
            commit_sync(consumer, offsets);
          }
        }
      }
    }
  }
  std::cout << "exit" << std::endl;
}
niyue commented

I submit a PR trying to fix this issue, and I verified this fix locally with this change.

Please let me know if this helps or if there is anything I need to change. Thanks.

I submit a PR trying to fix this issue, and I verified this fix locally with this change.

Please let me know if this helps or if there is anything I need to change. Thanks.

Thank you for submitting the PR. I commented a bit, please help to update the PR -- just need a slight change before merging it in. Cheers!

niyue commented

Thanks for the comments, I updated the PR, please check it out~