reactor/reactor-kafka

reactor-kafka 1.3.17 [reactor-kafka-sender] reactor.core.publisher.Operators : Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests

patpatpat123 opened this issue · 5 comments

Expected Behavior

Not throwing reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests

Actual Behavior

The setup is super simple. I am having a Reactor Kafka project, which business logic is:
1 Consume flux of messages
2 Apply transformation to the flux of messages to get a flux a transformed messages
3 Save the flux of transformed messages using a reactive repository (elastic/mongo/cassandra) using the saveAll(Publisher) method.
4 Send the transformed messages back to kafka

I tried ranging from gentle (10 messages per second) to aggressive (8000 messages per second)
And when it is aggressive, I am encountering 100% of the time, after the code being run for a while, this issue:

2023-03-22 15:27:52 13344 ERROR --- [reactor-kafka-sender-2050529121] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
Caused by: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:249)
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Steps to Reproduce

 @Override
    public final void run(String... args) {
        consume().subscribe();
    }

    public Flux<SenderResult<String>> consume() {
        return kafkaSender.send(reactiveElasticRepository.saveAll(kafkaReceiver.receive().map(message -> message.value().toUpperCase()).map(one -> toSenderRecord(one)));

Possible Solution

I do not know how to fix this 100% reproducible issue

To emphasize, this code has been BlockHound tested NON blocking.

I tried adding .onBackpressureBuffer() everywhere, issue still persists

Your Environment

  • Reactor version(s) used:
  • Other relevant libraries versions (eg. netty, ...): SpringBoot 3.0.4 + Reactor Kafka 1.3.17
  • JVM version graalVm 17
  • OS and version (eg uname -a): Issue reproducible in Linux Windows and MacOS

Have you tried increasing SenderOptions.maxInFlight ? (It defaults to Queues.SMALL_BUFFER_SIZE).

Thank you @garyrussell for the quick answer.

We took your advice and ran tests lasting one hour+.

Here is the code using maxInFlight, as per your suggestion:

import io.micrometer.core.instrument.MeterRegistry;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.MicrometerProducerListener;
import reactor.kafka.sender.SenderOptions;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MyProducerConfiguration {

    private static final int MAGICNUMBER = 99999;

    @Bean
    public KafkaSender<String, String> kafkaSender(final MeterRegistry registry) {
        final Map<String, Object> properties = new HashMap<>();
        properties.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "mykafka.com:9092");
        properties.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        final SenderOptions<String, String> senderOptions = SenderOptions.create(properties);
        final SenderOptions<String, String> senderOptionsWithMaxInFlight = senderOptions.maxInFlight(MAGICNUMBER);
        return KafkaSender.create(senderOptionsWithMaxInFlight.producerListener(new MicrometerProducerListener(registry)));
    }

}

We are using tests on our side which messages rate is 10000 messages per second and variating the number .maxInFlight between 999, 9999, 99999.

Unfortunately, the issue is still reproducible 100% of the time we run 11 tests in total since your answer and still running.

May I ask what is a possible root cause of this issue?

Could you kindly suggest the next step please?

Thank you

I don't have any other suggestions

...due to lack of requests

Implies the prefetch (maxInflight) has been reached.

Understood @garyrussell ,

We tried with Integer.MAX_VALUE and can still see the issue.

Is there a breakpoint I can put somewhere in order to further confirm/debug and provide more useful information on this issue please?

Closing as it seems it is not reproducible.

Many thanks @garyrussell for the help provided