fd4s/fs2-kafka

Kafka consumer polling interval is multiplied by 2

Yevhensh opened this issue · 0 comments

When specifying Kafka consumer settings (fs2.kafka.ConsumerSettings) I can see that poll interval is multiplied by 2,
so for now my workaround for consuming requests once per 10 seconds looks like that:

.withPollInterval(10 seconds / 2)

Full reproduce:

import cats.effect._
import cats.implicits._
import fs2.kafka._

import java.time.LocalTime
import scala.concurrent.duration._
import scala.util.Random

object Test extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val bootstrapServer: String      = "localhost:9092"
    val topic: String                = "test-topic"
    val pollInterval: FiniteDuration = 3.seconds

    for {
      _ <- produceRecords(bootstrapServer, topic)
      _ <- consumeRecords(bootstrapServer, topic, pollInterval)
    } yield ExitCode.Success
  }

  private def produceRecords(bootstrapServer: String, topic: String): IO[ProducerResult[String, String, Unit]] = {
    val producerSettings = ProducerSettings[IO, String, String].withBootstrapServers(bootstrapServer)

    KafkaProducer
      .resource(producerSettings)
      .use { producer =>
        List
          .fill(10)(IO(Random.alphanumeric.take(5).mkString))
          .sequence
          .map(values => values.map(ProducerRecord(topic, "key", _)))
          .flatMap { records =>
            producer
              .produce(ProducerRecords(records))
              .flatten
          }
      }
  }

  private def consumeRecords(bootstrapServer: String, topic: String, pollInterval: FiniteDuration): IO[Unit] = {
    val consumerSettings =
      ConsumerSettings[IO, String, String]
        .withBootstrapServers(bootstrapServer)
        .withGroupId("test-group")
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withPollInterval(pollInterval)
        .withMaxPollRecords(1)

    KafkaConsumer
      .resource(consumerSettings)
      .use { consumer =>
        consumer.subscribeTo(topic) >>
          consumer.stream
            .evalMap(msg => IO(println(s"${LocalTime.now()} - ${msg.record.value}")))
            .compile
            .drain
      }
  }

}

Output:

13:41:13.222 - F5Qdq
13:41:19.181 - FP5ol
13:41:25.184 - QMnmK
13:41:31.190 - ps4Ai
13:41:37.195 - LkzYg
13:41:43.207 - q5sA1
13:41:49.208 - PdK9W
13:41:55.217 - F28jw
13:42:01.223 - STNzB
13:42:07.227 - 1sKtc

Versions:

  "com.github.fd4s" %% "fs2-kafka"         % "1.8.0"
  "com.github.fd4s" %% "fs2-kafka-vulcan"  % "1.8.0"
  "com.github.fd4s" %% "vulcan"            % "1.7.1"