reactor-kafka 1.3.17 [reactor-kafka-sender] reactor.core.publisher.Operators : Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
patpatpat123 opened this issue · 5 comments
Expected Behavior
Not throwing reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
Actual Behavior
The setup is super simple. I am having a Reactor Kafka project, which business logic is:
1 Consume flux of messages
2 Apply transformation to the flux of messages to get a flux a transformed messages
3 Save the flux of transformed messages using a reactive repository (elastic/mongo/cassandra) using the saveAll(Publisher) method.
4 Send the transformed messages back to kafka
I tried ranging from gentle (10 messages per second) to aggressive (8000 messages per second)
And when it is aggressive, I am encountering 100% of the time, after the code being run for a while, this issue:
2023-03-22 15:27:52 13344 ERROR --- [reactor-kafka-sender-2050529121] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
Caused by: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:249)
at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Steps to Reproduce
@Override
public final void run(String... args) {
consume().subscribe();
}
public Flux<SenderResult<String>> consume() {
return kafkaSender.send(reactiveElasticRepository.saveAll(kafkaReceiver.receive().map(message -> message.value().toUpperCase()).map(one -> toSenderRecord(one)));
Possible Solution
I do not know how to fix this 100% reproducible issue
To emphasize, this code has been BlockHound tested NON blocking.
I tried adding .onBackpressureBuffer()
everywhere, issue still persists
Your Environment
- Reactor version(s) used:
- Other relevant libraries versions (eg.
netty
, ...): SpringBoot 3.0.4 + Reactor Kafka 1.3.17 - JVM version graalVm 17
- OS and version (eg
uname -a
): Issue reproducible in Linux Windows and MacOS
Have you tried increasing SenderOptions.maxInFlight
? (It defaults to Queues.SMALL_BUFFER_SIZE
).
Thank you @garyrussell for the quick answer.
We took your advice and ran tests lasting one hour+.
Here is the code using maxInFlight
, as per your suggestion:
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.MicrometerProducerListener;
import reactor.kafka.sender.SenderOptions;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MyProducerConfiguration {
private static final int MAGICNUMBER = 99999;
@Bean
public KafkaSender<String, String> kafkaSender(final MeterRegistry registry) {
final Map<String, Object> properties = new HashMap<>();
properties.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "mykafka.com:9092");
properties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final SenderOptions<String, String> senderOptions = SenderOptions.create(properties);
final SenderOptions<String, String> senderOptionsWithMaxInFlight = senderOptions.maxInFlight(MAGICNUMBER);
return KafkaSender.create(senderOptionsWithMaxInFlight.producerListener(new MicrometerProducerListener(registry)));
}
}
We are using tests on our side which messages rate is 10000 messages per second and variating the number .maxInFlight
between 999, 9999, 99999.
Unfortunately, the issue is still reproducible 100% of the time we run 11 tests in total since your answer and still running.
May I ask what is a possible root cause of this issue?
Could you kindly suggest the next step please?
Thank you
I don't have any other suggestions
...due to lack of requests
Implies the prefetch (maxInflight) has been reached.
Understood @garyrussell ,
We tried with Integer.MAX_VALUE and can still see the issue.
Is there a breakpoint I can put somewhere in order to further confirm/debug and provide more useful information on this issue please?
Closing as it seems it is not reproducible.
Many thanks @garyrussell for the help provided