[Question] How do you manually commit the offset after processing each record
atiqsayyed opened this issue · 3 comments
Hi,
I'm trying to achieve manual commit for each record, these are my settings
val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("localhost:9092"),
groupId = "monixGroup",
enableAutoCommit = false,
observableCommitType = ObservableCommitType.Sync,
observableCommitOrder = ObservableCommitOrder.AfterAck
)
Following is my consumer Observable
val monixConsumer = KafkaConsumerObservable[String, String](consumerCfg, List("monix"))
val consumer = monixConsumer
.bufferTimedWithPressure(1.second, 5)
.map(deserialize(_))
.map(callApi(_))
.runAsyncGetLast
It only commits the offset once it's done processing all the elements, not the batch of 5.
I want to commit every batch manually, is there a way todo this?
Hi,
Unfortunately there is no Observable
like this (monix-kafka
is very light wrapper) but you could disable commits and do it manually with consumer.commitSync()
You can create your own consumer using KafkaConsumerObservable.createConsumer()
and there is KafkaConsumerObservable.apply
accepting it in the constructor.
The easiest way to disable commits would be if @alexandru releases new version containing merged PR #14, then you could use monix.observable.commit.order = "no-ack"
I had the same problem. I solved it this way (inspired by alpakka and fs2-kafka):
- Creating classes:
final case class CommittableOffset(
topicPartition: TopicPartition,
offset: Long,
commit: Map[TopicPartition, Long] => Task[Unit]
)
final case class CommitableMessage[K, V](record: ConsumerRecord[K, V], commitableOffset: CommittableOffset)
class CommittableOffsetBatch(
val offsets: Map[TopicPartition, Long],
internalCommit: Map[TopicPartition, Long] => Task[Unit]
) {
def commit(): Task[Map[TopicPartition, Long]] = internalCommit(offsets).map(_ => offsets)
def updated(committableOffset: CommittableOffset): CommittableOffsetBatch =
new CommittableOffsetBatch(
offsets.updated(committableOffset.topicPartition, committableOffset.offset),
committableOffset.commit
)
}
object CommittableOffsetBatch {
def empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, _ => Task.pure(()))
def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch =
if (offsets.nonEmpty) {
val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) =>
acc.updated(o.topicPartition, o.offset)
}
new CommittableOffsetBatch(aggregatedOffsets, offsets.head.commit)
} else {
empty
}
}
- Creating my own
KafkaConsumerObserbableManual
that's emitCommitableMessage
:
// in runLoop function
val next = blocking(consumer.poll(pollTimeoutMillis))
if (shouldCommitBefore) consumerCommit(consumer)
// Feeding the observer happens on the Subscriber's scheduler
// if any asynchronous boundaries happen
val messages = next.asScala.map { record =>
CommitableMessage(
record,
CommittableOffset(
new TopicPartition(record.topic(), record.partition()),
record.offset(),
batch => Task.eval(consumer.synchronized(consumerCommitBatch(consumer, batch)))
)
)
}
Observer.feed(out, messages)(out.scheduler)
- Committing it manually:
KafkaConsumerObservableManual[Array[Byte], Array[Byte]](consumerSettings, List(config.topic))
// some processing
.bufferTimedAndCounted(1.second, 1000)
.map(messages => CommittableOffsetBatch(messages))
.mapTask(batch => batch.commit().map(_ => logger.debug(s"commit [${batch.offsets}]"))
@Avasil
If this decision does not contradict the ideology of the monix-kafka, I am ready to create a pull request with tests, docs and other stuff.