StreamSourceRecordReader should support AckPolicy.MANUAL
ahawtho opened this issue · 8 comments
StreamSourceRecordReader
uses the default AckPolicy
on RedisItemReader
, which is AUTO
. If the connector fails to write to the topic after the reader has received a message, and the task is stopped/restarted, that message will be lost (at-most-once processing).
The connector should also support at-least-once processing by XACK'ing each message after they've been committed, indicated by AckPolicy.MANUAL
. The connector should support a property such as:
redis.stream.delivery.type: at-least-once | at-most-once
and I'd argue the default should be at-least-once
. I believe I can put together a PR for this.
Thank you. I added an option to set the ack policy (auto or explicit).
Hi @jruaux , thanks for putting together a change for this. I've been working on another PR for this but there are some differences between our approaches:
I was hoping to define the behavior in terms of the semantics of the delivery policy (at least once vs at most once) rather than the Redis behavior of XACK. To me as a connector user, this is a more meaningful guarantee. I also believe the current implementation after these changes may be missing part of that guarantee. Read on for details:
If the connector fails between reading messages from the stream and writing the messages to Kafka, the default behavior of the StreamItemReader
will drop all of the messages that remain in Redis' "pending entries list", or PEL, described in the docs. This breaks the promise of "at-least-once" delivery. Even if the messages are written to Kafka, the dropped messages will remain in the PEL indefinitely, which essentially leaks memory in Redis.
One last comment: the new changes refer to a method StreamItemReader.ack(String[])
rather than StreamItemReader.ack(StreamMessage)
. This doesn't seem to be available in the previous version of spring-batch-redis (3.0.4), and it seems that this change introduces a breaking API change in a patch version (3.0.5), which breaks semantic versioning. I'd suggest restoring the old ack(StreamMessage)
method to avoid the breaking change.
I'll push my changes and post my PR so you can review the spirit of my approach. The changes are complete, but I was writing more unit tests to try to verify the behavior. I hadn't noticed the availability of the integration tests, but I'd be happy to pivot to writing more of those instead to demonstrate some of the potential issues with recovery.
Edit: One other thing I forgot to mention is that by relying solely on the kafka-connect source offset for the stream recovery, it provides the framework necessary to support "exactly-once" processing for connectors just released in Kafka 3.3 .
Here's a link to the commit with all the changes. The master branch here has diverged too much for now, but it's probably not too much work to incorporate the idea of the StreamMessageRecovery
into the current master:
Thanks for your input and contribution. I agree with exposing the semantics to the user (vs the Redis implementation). I will also incorporate that message recovery mechanism, although I'm thinking it might belong to Spring Batch Redis as it might benefit other projects.
Good catch regarding the breaking API in Spring Batch Redis. I'll restore that method and push a new version shortly.
Thanks again!
@jruaux - I pushed another commit on that branch that fixes issues with StreamMessageRecovery
and adds integration tests. I know you won't be incorporating these changes as-is, but I'd appreciate you taking a look. I'm not sure the best way to provide the integration in Spring Batch Redis to ack the messages that were committed previously by Connect, but not yet acked manually (e.g. we lost a connection to Redis between reading a batch and attempting to commit messages). This behavior is necessary to support "exactly-once" connector processing. The test RedisSourceTaskIT.pollStreamAtLeastOnceRecoverFromOffset(RedisTestContext)
mocks restoring from a connector offset.
@ahawtho I included your message recovery logic in Spring Batch Redis v3.0.7 and this project's early access release leverages it. Can you let me know what you think? If all looks good I'll cut a new version
@jruaux I will try to take a look at this tomorrow or Friday. Thanks for putting it all together!
🎉 This issue has been resolved in v0.7.5
(Release Notes)