Kafka consumer polling interval is multiplied by 2
Yevhensh opened this issue · 0 comments
Yevhensh commented
When specifying Kafka consumer settings (fs2.kafka.ConsumerSettings) I can see that poll interval is multiplied by 2,
so for now my workaround for consuming requests once per 10 seconds looks like that:
.withPollInterval(10 seconds / 2)
Full reproduce:
import cats.effect._
import cats.implicits._
import fs2.kafka._
import java.time.LocalTime
import scala.concurrent.duration._
import scala.util.Random
object Test extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val bootstrapServer: String = "localhost:9092"
val topic: String = "test-topic"
val pollInterval: FiniteDuration = 3.seconds
for {
_ <- produceRecords(bootstrapServer, topic)
_ <- consumeRecords(bootstrapServer, topic, pollInterval)
} yield ExitCode.Success
}
private def produceRecords(bootstrapServer: String, topic: String): IO[ProducerResult[String, String, Unit]] = {
val producerSettings = ProducerSettings[IO, String, String].withBootstrapServers(bootstrapServer)
KafkaProducer
.resource(producerSettings)
.use { producer =>
List
.fill(10)(IO(Random.alphanumeric.take(5).mkString))
.sequence
.map(values => values.map(ProducerRecord(topic, "key", _)))
.flatMap { records =>
producer
.produce(ProducerRecords(records))
.flatten
}
}
}
private def consumeRecords(bootstrapServer: String, topic: String, pollInterval: FiniteDuration): IO[Unit] = {
val consumerSettings =
ConsumerSettings[IO, String, String]
.withBootstrapServers(bootstrapServer)
.withGroupId("test-group")
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withPollInterval(pollInterval)
.withMaxPollRecords(1)
KafkaConsumer
.resource(consumerSettings)
.use { consumer =>
consumer.subscribeTo(topic) >>
consumer.stream
.evalMap(msg => IO(println(s"${LocalTime.now()} - ${msg.record.value}")))
.compile
.drain
}
}
}
Output:
13:41:13.222 - F5Qdq
13:41:19.181 - FP5ol
13:41:25.184 - QMnmK
13:41:31.190 - ps4Ai
13:41:37.195 - LkzYg
13:41:43.207 - q5sA1
13:41:49.208 - PdK9W
13:41:55.217 - F28jw
13:42:01.223 - STNzB
13:42:07.227 - 1sKtc
Versions:
"com.github.fd4s" %% "fs2-kafka" % "1.8.0"
"com.github.fd4s" %% "fs2-kafka-vulcan" % "1.8.0"
"com.github.fd4s" %% "vulcan" % "1.7.1"