etspaceman/kinesis4cats

Docs for fs2 KCL should clarify call to `consumer.commitRecords`

Closed this issue · 5 comments

First of all, thank you for a great library! I was able to use it to migrate an old KCL app still using v1 of the AWS KCL.

One hurdle I ran into, however, was that the docs for using the KCLConsumerFS2 show a call to consumer.commitRecords, so I added one in my code as well, thinking it was necessary. Only after testing and debugging did I find that the default config results in auto-committing. Ultimately this resulted in runtime errors when my manual commit occurred, because I was trying to commit a record that had already been committed by the auto-commit:

java.lang.IllegalArgumentException: Could not checkpoint at extended sequence number {SequenceNumber: XXXXX,SubsequenceNumber: 0} as it did not fall into acceptable range between the last checkpoint {SequenceNumber: YYYYY,SubsequenceNumber: 0} and the greatest extended sequence number passed to this record processor {SequenceNumber: YYYYY,SubsequenceNumber: 0}
	software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer.checkpoint(ShardRecordProcessorCheckpointer.java:124)
	kinesis4cats.kcl.CommittableRecord.$anonfun$checkpoint$1(CommittableRecord.scala:87)
	map @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$3(package.scala:103)
	map @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$3(package.scala:98)
	flatMap @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$2(package.scala:97)
	map2 @ fs2.Stream$.$anonfun$fromQueueUnterminated$1(Stream.scala:3540)
	map @ kinesis4cats.compat.retry.package$.applyPolicy(package.scala:116)
	flatMap @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$1(package.scala:96)

Do you think it would make sense to update the docs to clarify that calling consumer.commitRecords is not necessary unless you've explicitly changed the default RecordProcessor.Config behavior?

Sorry for the late response here. The default configuration for the FS2 consumer should have false for this, see the override here. How are you constructing your FS2 consumer? Can you give me a quick reproducible example?

No worries, thanks for getting back to me! I searched the repo and that defaultProcessConfig you linked to doesn't appear to be used anywhere.

I tested in a repl and can confirm that autoCommit is set to true when using KCLConsumerFS2.Builder.default. I had to do a bunch of reflection and asInstanceOfs to work around private class fields, so excuse that 😅

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import kinesis4cats.kcl.RecordProcessor
import kinesis4cats.kcl.fs2.KCLConsumerFS2
import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionInStreamExtended}
import software.amazon.kinesis.processor.SingleStreamTracker

val consumer = KCLConsumerFS2.Builder.default[IO](
  streamTracker = new SingleStreamTracker(
    "test",
    InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON),
  ),
  appName = "test",
).build

val fs2ConfField = classOf[KCLConsumerFS2[IO]].getDeclaredField("config")
fs2ConfField.setAccessible(true)

val processorFactory = consumer.use(c =>
  IO.pure(fs2ConfField.get(c).asInstanceOf[KCLConsumerFS2.Config[IO]].underlying.processorConfig.shardRecordProcessorFactory)
).unsafeRunSync().asInstanceOf[RecordProcessor.Factory[IO]]

val factoryConfField = classOf[RecordProcessor.Factory[IO]].getDeclaredField("config")
factoryConfField.setAccessible(true)

val conf = factoryConfField.get(processorFactory).asInstanceOf[RecordProcessor.Config]

conf.autoCommit // true

It looks like it might have gotten lost in 8ea14b6

Yep, that's exactly what happened. Good catch! I introduced #265 to resolve.

Resolved in #265