monix/monix-kafka

[Question] How do you manually commit the offset after processing each record

atiqsayyed opened this issue · 3 comments

Hi,
I'm trying to achieve manual commit for each record, these are my settings

  val consumerCfg = KafkaConsumerConfig.default.copy(
    bootstrapServers = List("localhost:9092"),
    groupId = "monixGroup",
    enableAutoCommit = false,
    observableCommitType = ObservableCommitType.Sync,
    observableCommitOrder = ObservableCommitOrder.AfterAck
  )

Following is my consumer Observable

 val monixConsumer = KafkaConsumerObservable[String, String](consumerCfg, List("monix"))
    val consumer = monixConsumer
    .bufferTimedWithPressure(1.second, 5)
    .map(deserialize(_))
    .map(callApi(_))
    .runAsyncGetLast

It only commits the offset once it's done processing all the elements, not the batch of 5.
I want to commit every batch manually, is there a way todo this?

Hi,
Unfortunately there is no Observable like this (monix-kafka is very light wrapper) but you could disable commits and do it manually with consumer.commitSync()

You can create your own consumer using KafkaConsumerObservable.createConsumer() and there is KafkaConsumerObservable.apply accepting it in the constructor.

The easiest way to disable commits would be if @alexandru releases new version containing merged PR #14, then you could use monix.observable.commit.order = "no-ack"

I had the same problem. I solved it this way (inspired by alpakka and fs2-kafka):

  1. Creating classes:
final case class CommittableOffset(
  topicPartition: TopicPartition,
  offset: Long,
  commit: Map[TopicPartition, Long] => Task[Unit]
)

final case class CommitableMessage[K, V](record: ConsumerRecord[K, V], commitableOffset: CommittableOffset)

class CommittableOffsetBatch(
  val offsets: Map[TopicPartition, Long],
  internalCommit: Map[TopicPartition, Long] => Task[Unit]
) {
  def commit(): Task[Map[TopicPartition, Long]] = internalCommit(offsets).map(_ => offsets)

  def updated(committableOffset: CommittableOffset): CommittableOffsetBatch =
    new CommittableOffsetBatch(
      offsets.updated(committableOffset.topicPartition, committableOffset.offset),
      committableOffset.commit
    )
}

object CommittableOffsetBatch {
  def empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, _ => Task.pure(()))

  def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch =
    if (offsets.nonEmpty) {
      val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) =>
        acc.updated(o.topicPartition, o.offset)
      }
      new CommittableOffsetBatch(aggregatedOffsets, offsets.head.commit)
    } else {
      empty
    }
}
  1. Creating my own KafkaConsumerObserbableManual that's emit CommitableMessage:
// in runLoop function
val next = blocking(consumer.poll(pollTimeoutMillis))
if (shouldCommitBefore) consumerCommit(consumer)
// Feeding the observer happens on the Subscriber's scheduler
// if any asynchronous boundaries happen
val messages = next.asScala.map { record =>
  CommitableMessage(
    record,
    CommittableOffset(
      new TopicPartition(record.topic(), record.partition()),
      record.offset(),
      batch => Task.eval(consumer.synchronized(consumerCommitBatch(consumer, batch)))
    )
  )
}
Observer.feed(out, messages)(out.scheduler)
  1. Committing it manually:
KafkaConsumerObservableManual[Array[Byte], Array[Byte]](consumerSettings, List(config.topic))
   // some processing
  .bufferTimedAndCounted(1.second, 1000)
  .map(messages => CommittableOffsetBatch(messages))
  .mapTask(batch => batch.commit().map(_ => logger.debug(s"commit [${batch.offsets}]"))

@Avasil
If this decision does not contradict the ideology of the monix-kafka, I am ready to create a pull request with tests, docs and other stuff.

@poslegm
Addition like this would be greatly appreciated! If you create PR we can work out the implementation details. :)