reactor/reactor-kafka

DefaultKafkaReceiver not correctly handle backpressure

deripas opened this issue · 14 comments

Hi!
Try using reactor-kafka for data batch processing.
kafkaReceiver is used in pipeline:

kafkaReceiver.receive()
                .bufferTimeout(100, Duration.ofMillis(100))
                .flatMap(receiverRecords ->
                        Flux.fromIterable(receiverRecords)
                                .parallel()
                                // processing 
                                .sequential())
                .subscribe()

And I have error:

[ERROR] (parallel-1) Scheduler worker in group main failed with an uncaught exception - reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
Caused by: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)

I think problem in DefaultKafkaReceiver:

                    pendingCount.decrementAndGet();
                    if (requestsPending.get() > 0 && !awaitingTransaction.get()) {
                        if (partitionsPaused.getAndSet(false))
                            consumer.resume(consumer.assignment());
                    } else {
                        if (!partitionsPaused.getAndSet(true))
                            consumer.pause(consumer.assignment());
                    }

                    ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
                    if (records.count() > 0) {
                        recordSubmission.next(records);
                    }
                    if (isActive.get()) {
                        int count = ((ackMode == AckMode.AUTO_ACK || ackMode == AckMode.EXACTLY_ONCE) && records.count() > 0) ? 1 : records.count();
                        if (requestsPending.get() == Long.MAX_VALUE || requestsPending.addAndGet(0 - count) > 0 || commitEvent.inProgress.get() > 0)
                            scheduleIfRequired();
                    }

Example case:

  • requestsPending = 0
  • bufferTimeout do request(100) and now requestsPending = 100
  • consumer call poll
ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
  • records.size for example 200
  • requestsPending = -100 (requestsPending.addAndGet(0 - count))
  • bufferTimeout expected 100 records, but received 200 records
  • error "Could not emit buffer due to lack of requests" is ready

I have this error with reactor Kafka 1.2.2 and reactor core 3.3.7:

Flux.just(receiverOptions)
                .map(KafkaReceiver::create)
                .flatMap(kafkaReceiver -> kafkaReceiver.receive()
                        .bufferTimeout(batchSize, Duration.ofMillis(50))
                        .concatMap(batch -> processBatch(kafkaReceiver, batch))
                ).doOnError(err -> LOGGER.error("Unexpected error in consuming Kafka events", err))

I read a batch of Kafka messages using kafkaReceiver.receive() and then I want to process them all and only after it to ask for another batch. And processBatch may take a long time (200 ms - 10 sec)

The initial issue is reactor/reactor-core#1557
@deripas did you find a solution to fix it?

@max-grigoriev please try 1.3.0.M1

@max-grigoriev please try 1.3.0.M1

OK, I'll do. Thanks

@max-grigoriev
no, unfortunately, I didn't find a beautiful solution.
Then I had to urgently add a workaround.

@max-grigoriev please try 1.3.0.M1

@bsideup , Still have the same issue:

r.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:234)
	Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxLift] :
	reactor.core.publisher.Flux.bufferTimeout
	c.u.a.p.n.g.kafka.PNKafkaConsumer.lambda$listenForTopic$1(PNKafkaConsumer.java:142)
Error has been observed at the following site(s):
	|_ Flux.bufferTimeout ⇢ at c.u.a.p.n.g.kafka.PNKafkaConsumer.lambda$listenForTopic$1(PNKafkaConsumer.java:142)
	|_     Flux.concatMap ⇢ at c.u.a.p.n.g.kafka.PNKafkaConsumer.lambda$listenForTopic$1(PNKafkaConsumer.java:143)
	|_       Flux.flatMap ⇢ at c.u.a.p.n.g.kafka.PNKafkaConsumer.listenForTopic(PNKafkaConsumer.java:141)
Stack trace:
		at reactor.core.Exceptions.failWithOverflow(Exceptions.java:234)
		at r.c.p.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:219)
		at r.c.p.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:150)
		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
		at i.m.c.i.composite.CompositeTimer.recordCallable(CompositeTimer.java:68)
		at io.micrometer.core.instrument.Timer.lambda$wrap$1(Timer.java:155)
		at java.util.concurrent.FutureTask.run(FutureTask.java:264)
		at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
		at j.u.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
		at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
		at java.lang.Thread.run(Thread.java:834)

@max-grigoriev
no, unfortunately, I didn't find a beautiful solution.
Then I had to urgently add a workaround.

How did you solve it even without a beautiful solution?

@max-grigoriev See example: TestCase.java.zip

Not a solution, just work around!

Problem

Affter using .bufferTimeout(maxSize, maxTime) we should use .onBackpressureBuffer() to avoid an exception
OverflowException: Could not emit buffer due to lack of requests, but .onBackpressureBuffer() do request(unbounded).
To solve this problem I added a periodic call subscription.request(requests);, see ManualRateLimitSubscriber.

Code

Compare methods originalPipeline() and uglyFix():

    public void originalPipeline() {
        flux.subscribeOn(parallel())
                .log()
                .bufferTimeout(10, ofMillis(200))
                .subscribe(batchSubscriber);
    }


    public void uglyFix() {
        flux.subscribeOn(parallel())
                .log()
                .compose(onBackpressureBufferTimeoutWithRateLimit(10, ofMillis(200)))
                .subscribe(batchSubscriber);
    }

Call .bufferTimeout(10, ofMillis(200)) replace to .compose(onBackpressureBufferTimeoutWithRateLimit(10, ofMillis(200))) where:

    private static <T> Function<Flux<T>, Publisher<List<T>>> onBackpressureBufferTimeoutWithRateLimit(int maxSize, Duration maxTime) {
        return flux -> new Flux<List<T>>() {
            @Override
            public void subscribe(CoreSubscriber<? super List<T>> actual) {
                AtomicLong limit = new AtomicLong(maxSize * XS_BUFFER_SIZE);
                flux.compose(manualRateLimit(limit))
                        .bufferTimeout(maxSize, maxTime)
                        .onBackpressureBuffer()
                        .doOnNext(list -> limit.addAndGet(list.size()))
                        .subscribe(actual);
            }
        };
    }
    private static <T> Function<Flux<T>, Publisher<T>> manualRateLimit(AtomicLong requestsPending) {
        return flux -> new Flux<T>() {
            @Override
            public void subscribe(CoreSubscriber<? super T> actual) {
                flux.subscribe(new ManualRateLimitSubscriber<>(requestsPending, actual));
            }
        };
    }

    @Slf4j
    @RequiredArgsConstructor
    public static class ManualRateLimitSubscriber<T> extends BaseSubscriber<T> {

        private final AtomicLong requestsLimit;
        private final CoreSubscriber<? super T> actual;
        private final AtomicReference<Disposable> task = new AtomicReference<>();

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            actual.onSubscribe(new Subscription() {
                @Override
                public void request(long l) {
                    // ignore
                }

                @Override
                public void cancel() {
                    ManualRateLimitSubscriber.this.cancel();
                }
            });
            task.updateAndGet(current -> {
                if (current != null) current.dispose();
                return Schedulers.parallel()
                        .schedulePeriodically(() -> {
                            long requests = requestsLimit.getAndSet(0);
                            if (requests > 0) {
                                subscription.request(requests);
                            }
                        }, 0, 100, MILLISECONDS);
            });
        }

        @Override
        protected void hookOnNext(T value) {
            actual.onNext(value);
        }

        @Override
        protected void hookOnComplete() {
            actual.onComplete();
        }

        @Override
        protected void hookOnError(Throwable throwable) {
            actual.onError(throwable);
        }

        @Override
        protected void hookFinally(SignalType type) {
            task.updateAndGet(current -> {
                if (current != null) current.dispose();
                return null;
            });
        }
    }

Still an issue, we could not reproduce it with tests/load tests but we got the exception on prod.

The issue is reproducible in v1.3.0

I have this issue with v1.3.3

This issue should have been closed by 1.3.3.

We just fixed an issue where the back pressure (pause) is canceled if a rebalance occurs.

#238

If that is not your problem, please provide code, configuration, and logs in a new issue.

According to our logs, the rebalance was caused by the error, not vice versa. These are the earliest logs from 1 pod, the other pods started to rebalance and got the same error:

2021-08-30 08:30:21.139  INFO 1 --- [     parallel-1] e.c.c.ElasticsearchSsmsEventsIndexerImpl : Save 20 records
2021-08-30 08:30:31.200  INFO 1 --- [ms_event_prod-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=elastic_public_ssms_event_prod, groupId=elastic_gr
oup_prod_public_ssms_event_prod] Revoke previously assigned partitions public_ssms_event_prod-0
2021-08-30 08:30:31.200  INFO 1 --- [ms_event_prod-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=elastic_public_ssms_event_prod, groupId=elastic_gr
oup_prod_public_ssms_event_prod] Member elastic_public_ssms_event_prod-27d217d7-7f70-4d9d-9f6d-8f869b80ba25 sending LeaveGroup request to coordinator kafka:9092 (id: 2147483646 rack: null) due to the consumer is being closed
2021-08-30 08:30:31.269  INFO 1 --- [ms_event_prod-1] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2021-08-30 08:30:31.269  INFO 1 --- [ms_event_prod-1] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-08-30 08:30:31.270  INFO 1 --- [ms_event_prod-1] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2021-08-30 08:30:31.278 ERROR 1 --- [     parallel-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
        Suppressed: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
                at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233) ~[reactor-core-3.4.5.jar!/:3.4.5]
                at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227) ~[reactor-core-3.4.5.jar!/:3.4.5]
                at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158) ~[reactor-core-3.4.5.jar!/:3.4.5]
                at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.4.5.jar!/:3.4.5]
                at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.4.5.jar!/:3.4.5]
                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
                at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
                at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
                at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
                at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
        at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233) ~[reactor-core-3.4.5.jar!/:3.4.5]
        at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227) ~[reactor-core-3.4.5.jar!/:3.4.5]
        at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158) ~[reactor-core-3.4.5.jar!/:3.4.5]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.4.5.jar!/:3.4.5]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.4.5.jar!/:3.4.5]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2021-08-30 08:30:31.280  INFO 1 --- [ms_event_prod-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.consumer for elastic_public_ssms_event_prod unregistered
2021-08-30 08:30:51.031  INFO 1 --- [tor-tcp-epoll-3] e.c.c.ElasticsearchSsmsEventsIndexerImpl : consumed 20 messages from topic public_ssms_event_prod partition 0 last offset 3363115
2021-08-30 08:30:51.031  INFO 1 --- [tor-tcp-epoll-3] e.c.c.ElasticsearchSsmsEventsIndexerImpl : Finished indexing 20 records, request more

That appears to be far away from the reactor-kafka code, where back pressure is implemented by pausing the consumer; nothing is emitted from the receiver if there are no available requests in the ConsumerEventLoop.

I am not a reactor expert but it appears to be something else in your topology that doesn't support back pressure - maybe a question for general reactor community?

My code is similar to @max-grigoriev in this issue: I request events in batches (request(1)), which should have been fixed in reactor/reactor-core#1557, I process the batch (index them in Elasticsearch), commit the offset of the last successful indexed event and request the next portion.
The workaround from @deripas works for me, but a real solution would be nicer.