reactor/reactor-kafka

Topic has lag when application consume completed

sogreatsg opened this issue · 7 comments

After I have produced 3000 records When the application consume completed in application log show they have all of 3000 records already but there has little lag in the topic (21 records) then I restart the application consumer resuming the process duplicate 21 records amount in lag, how to fix this issue.

Thank you for your help.

 fun subscribe() {
        val props = configProperties()
        val receiverOptions = ReceiverOptions.create<String, String>(props)
        subscription = KafkaReceiver.create(receiverOptions.subscription(Collections.singleton(properties.listener.topic)))
                .receive()
                .flatMap { record ->
                    log.info(marker, "receive record at  partition=${record.partition()}, offset=${record.offset()}, value=${record.value()}")
                    processEvent.execute(record.value()).map { record }
                }.flatMap {
                    it.receiverOffset().acknowledge().toMono()
                }
                .subscribe({
                    log.info(marker, "success")
                }, {
                    log.error(marker, "unexpected completed! need re-subscription!")
                    subscribe()
                })
    }

I use reactor kafka v.1.3.15

You need to wait for at least the commitInterval (default 5 seconds) after processing the last record, before disposing the pipeline.

thank you. but after I add waiting time in code I still get lag

        val props = configProperties()
        val receiverOptions = ReceiverOptions.create<String, String>(props)
        subscription = KafkaReceiver.create(receiverOptions.subscription(Collections.singleton(properties.listener.topic)))
                .receive()
                .flatMap { record ->
                    log.info(marker, "receive record at  partition=${record.partition()}, offset=${record.offset()}, value=${record.value()}")
                    processNotificationEvent.execute(record.value()).map { record }
                }
                .doOnNext {
                    log.info(marker, "acknowledge")
                    it.receiverOffset().acknowledge()
                }
                .sample(Duration.ofSeconds(5L))
                .concatMap {
                    log.info(marker, "commit")
                    it.receiverOffset().commit()
                }
                .subscribe({
                    log.info(marker, "success")
                }, {
                    log.error(marker, "unexpected completed! need re-subscription!")
                    subscribe()
                })
    }`

 --replication-factor 3 --partitions 3  have 1 application consume

In this version, you are not subscribing to the commit mono - you need to wait for all the commits to complete.

If you still can't figure it out, please provide an MCRE so I can see what's wrong.

Thank you. I will try to wait for all the commits to complete.

Hi @garyrussell , I'm so curious about this case. I have a similar problem but with the commitInterval=0 and making a concatMap after the record processing in a flatMap.

In the concatMap I use:

record.receiverOffset().commit().block()

Any idea why this might be happening?, thanks in advance.

@oaPiet You need to show your complete pipeline; perhaps that code is running on an async thread (likely) and you are not waiting for all commits to complete before disposing the subscription.

Thanks @garyrussell, you was right, it was my bad working with parallel() as part of my pipeline. The threads was not correctly handled and it disposed the subscription prematurely like you predicted.