Docs for fs2 KCL should clarify call to `consumer.commitRecords`
Closed this issue · 5 comments
First of all, thank you for a great library! I was able to use it to migrate an old KCL app still using v1 of the AWS KCL.
One hurdle I ran into, however, was that the docs for using the KCLConsumerFS2
show a call to consumer.commitRecords
, so I added one in my code as well, thinking it was necessary. Only after testing and debugging did I find that the default config results in auto-committing. Ultimately this resulted in runtime errors when my manual commit occurred, because I was trying to commit a record that had already been committed by the auto-commit:
java.lang.IllegalArgumentException: Could not checkpoint at extended sequence number {SequenceNumber: XXXXX,SubsequenceNumber: 0} as it did not fall into acceptable range between the last checkpoint {SequenceNumber: YYYYY,SubsequenceNumber: 0} and the greatest extended sequence number passed to this record processor {SequenceNumber: YYYYY,SubsequenceNumber: 0}
software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer.checkpoint(ShardRecordProcessorCheckpointer.java:124)
kinesis4cats.kcl.CommittableRecord.$anonfun$checkpoint$1(CommittableRecord.scala:87)
map @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$3(package.scala:103)
map @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$3(package.scala:98)
flatMap @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$2(package.scala:97)
map2 @ fs2.Stream$.$anonfun$fromQueueUnterminated$1(Stream.scala:3540)
map @ kinesis4cats.compat.retry.package$.applyPolicy(package.scala:116)
flatMap @ kinesis4cats.compat.retry.package$.$anonfun$retryingOnSomeErrorsImpl$1(package.scala:96)
Do you think it would make sense to update the docs to clarify that calling consumer.commitRecords
is not necessary unless you've explicitly changed the default RecordProcessor.Config
behavior?
Sorry for the late response here. The default configuration for the FS2 consumer should have false
for this, see the override here. How are you constructing your FS2 consumer? Can you give me a quick reproducible example?
No worries, thanks for getting back to me! I searched the repo and that defaultProcessConfig
you linked to doesn't appear to be used anywhere.
I tested in a repl and can confirm that autoCommit
is set to true
when using KCLConsumerFS2.Builder.default
. I had to do a bunch of reflection and asInstanceOf
s to work around private class fields, so excuse that 😅
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import kinesis4cats.kcl.RecordProcessor
import kinesis4cats.kcl.fs2.KCLConsumerFS2
import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionInStreamExtended}
import software.amazon.kinesis.processor.SingleStreamTracker
val consumer = KCLConsumerFS2.Builder.default[IO](
streamTracker = new SingleStreamTracker(
"test",
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON),
),
appName = "test",
).build
val fs2ConfField = classOf[KCLConsumerFS2[IO]].getDeclaredField("config")
fs2ConfField.setAccessible(true)
val processorFactory = consumer.use(c =>
IO.pure(fs2ConfField.get(c).asInstanceOf[KCLConsumerFS2.Config[IO]].underlying.processorConfig.shardRecordProcessorFactory)
).unsafeRunSync().asInstanceOf[RecordProcessor.Factory[IO]]
val factoryConfField = classOf[RecordProcessor.Factory[IO]].getDeclaredField("config")
factoryConfField.setAccessible(true)
val conf = factoryConfField.get(processorFactory).asInstanceOf[RecordProcessor.Config]
conf.autoCommit // true
Yep, that's exactly what happened. Good catch! I introduced #265 to resolve.
Resolved in #265