Rebalancing always waits until maxDelayRebalance with AckMode.EXACTLY_ONCE
damn1kk opened this issue · 0 comments
Hello, I have a topic with multiple partitions and multiple app instances. I use AckMode.EXACTLY_ONCE with transactional sender.
Rebalance always waits until maxDelayRebalance.
I get a repeated message in the log: reactor.kafka.receiver.internals.ConsumerEventLoop - Rebalancing; waiting for N records in pipeline
Similar to the issue discussed here in the comments , but I have AckMode.EXACTLY_ONCE.
I think the problem is that the inPipeline
variable changes in different objects.
isPipeline
decreases in object created here:
but increases in another object here:
So variable inPipeline
is always positive and the condition here is always waits until maxDelayRebalance:
Expected Behavior
Rebalancing should complete after all read messages have been processed and the transaction has committed.
Actual Behavior
Rebalancing always waits until maxDelayRebalance.
Example to Reproduce
https://github.com/damn1kk/kafka-rebalance-issue/blob/master/src/test/java/KafkaRebalanceTest.java
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(
Map.of(
ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "consumer-group",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
)
)
.addAssignListener(onAssign -> log.info("For {} assigned: {}", name, onAssign))
.addRevokeListener(onRevoke -> log.info("For {} revoked: {}", name, onRevoke))
.maxDeferredCommits(100)
.maxDelayRebalance(REBALANCE_LIMIT)
.subscription(List.of(TOPIC_NAME));
KafkaReceiver.create(receiverOptions)
.receiveExactlyOnce(innerSender.transactionManager())
.concatMap(r -> r.groupBy(ConsumerRecord::partition, 1)
.flatMap(c -> innerSender.send(
c.map(cr -> {
try {
log.info("Handle message by {} with key: {}", name, Integer.parseInt(cr.key()));
latch.countDown();
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (Integer.parseInt(cr.key()) % 2 == 0) {
return new ProducerRecord<>(TOPIC_RESULT1, cr.key(), cr.value());
} else {
return new ProducerRecord<>(TOPIC_RESULT2, cr.key(), cr.value());
}
})
.map(pr -> SenderRecord.create(pr, pr.key()))
))
.then(innerSender.transactionManager().commit()),
0
)
.subscribe();
Environment
- reactor-kafka 1.3.21
- kafka-clients 2.8.2
- reactor-core 3.5.7