reactor/reactor-kafka

Rebalancing always waits until maxDelayRebalance with AckMode.EXACTLY_ONCE

damn1kk opened this issue · 0 comments

Hello, I have a topic with multiple partitions and multiple app instances. I use AckMode.EXACTLY_ONCE with transactional sender.
Rebalance always waits until maxDelayRebalance.
I get a repeated message in the log: reactor.kafka.receiver.internals.ConsumerEventLoop - Rebalancing; waiting for N records in pipeline
Similar to the issue discussed here in the comments , but I have AckMode.EXACTLY_ONCE.

I think the problem is that the inPipeline variable changes in different objects.

isPipeline decreases in object created here:

CommittableBatch offsetBatch = new CommittableBatch();
for (ConsumerRecord<K, V> r : consumerRecords) {
offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());

but increases in another object here:

So variable inPipeline is always positive and the condition here is always waits until maxDelayRebalance:

int inPipeline = commitEvent.commitBatch.getInPipeline();
if (inPipeline > 0 || this.awaitingTransaction.get()) {
long end = maxDelayRebalance + System.currentTimeMillis();
do {
try {
log.debug("Rebalancing; waiting for {} records in pipeline", inPipeline);
Thread.sleep(interval);
commitEvent.runIfRequired(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
inPipeline = commitEvent.commitBatch.getInPipeline();
} while (isActive.get() && (inPipeline > 0 || this.awaitingTransaction.get())
&& System.currentTimeMillis() < end);

Expected Behavior

Rebalancing should complete after all read messages have been processed and the transaction has committed.

Actual Behavior

Rebalancing always waits until maxDelayRebalance.

Example to Reproduce

https://github.com/damn1kk/kafka-rebalance-issue/blob/master/src/test/java/KafkaRebalanceTest.java

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(
                        Map.of(
                                ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
                                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
                                ConsumerConfig.GROUP_ID_CONFIG, "consumer-group",
                                ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10,
                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
                                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
                        )
                )
                .addAssignListener(onAssign -> log.info("For {} assigned: {}", name, onAssign))
                .addRevokeListener(onRevoke -> log.info("For {} revoked: {}", name, onRevoke))
                .maxDeferredCommits(100)
                .maxDelayRebalance(REBALANCE_LIMIT)
                .subscription(List.of(TOPIC_NAME));


        KafkaReceiver.create(receiverOptions)
                .receiveExactlyOnce(innerSender.transactionManager())
                .concatMap(r -> r.groupBy(ConsumerRecord::partition, 1)
                                .flatMap(c -> innerSender.send(
                                        c.map(cr -> {
                                                    try {
                                                        log.info("Handle message by {} with key: {}", name, Integer.parseInt(cr.key()));
                                                        latch.countDown();
                                                        Thread.sleep(1000);
                                                    } catch (InterruptedException e) {
                                                        throw new RuntimeException(e);
                                                    }
                                                    if (Integer.parseInt(cr.key()) % 2 == 0) {
                                                        return new ProducerRecord<>(TOPIC_RESULT1, cr.key(), cr.value());
                                                    } else {
                                                        return new ProducerRecord<>(TOPIC_RESULT2, cr.key(), cr.value());
                                                    }
                                                })
                                                .map(pr -> SenderRecord.create(pr, pr.key()))

                                ))
                                .then(innerSender.transactionManager().commit()),
                        0
                )
                .subscribe();

Environment

  • reactor-kafka 1.3.21
  • kafka-clients 2.8.2
  • reactor-core 3.5.7