Draft time-ordered message processing using klaxit/redis-stream-rs
Terkwood opened this issue · 6 comments
Time-ordered message processing using klaxit/redis-stream-rs
BUGOUT's redis stream code consists of a wasteful amount of boilerplate, using repeated definitions of Unacknowledeged
, etc. It could be vastly simplified by moving to something like klaxit/redis-stream-rs
But that library can only read from one stream at a time. Draft an alternative implementation that can process all entries from multiple streams in time order, as required by BUGOUT.
Planning
Cargo feature
It should use our fork of redis-stream-rs, and declare the "interleaved" processing as a Cargo-level feature, so that we retain the option to contribute it back to the original repository.
Design scribble
We can basically reuse the pattern of sorting xids
found in BUGOUT. This Rc
is captured by all of the stream-handling closures.
let unsorted: Rc<Mutex<HashMap<XID, StreamInput>>> = Rc::new(Mutex::new(HashMap::new()));
This almost worked in #501:
- declaring the array of unsorted
xids
: https://github.com/Terkwood/BUGOUT/pull/501/files#diff-33b72f5057924df4752aff3c8439f2d69b1d0f642e387badffc55a55d71cd87fR22 - sorting it: https://github.com/Terkwood/BUGOUT/pull/501/files#diff-33b72f5057924df4752aff3c8439f2d69b1d0f642e387badffc55a55d71cd87fR113
changes from klaxit approach
In order to accommodate sorting, we need to split up the xread_options
call, the per-message self.consume
call, and the xack
call. See fn process_message(&mut self, id: &str, message: &Message) -> Result<()> {
I worry that for our use case, the strategy used in the klaxit lib could be problematic. In #501 we had to acknowledge all of the streams prior to actually processing their data. Could this cause different timings for our simple logic? My attempt at this may have also confused keys with ids 😄
Looking further at the redis-stream lib, their Consumer
implementation is centered around reading a single stream. This looks like a potential jumping off point for an alternative program that can read multiple streams, then sort them in time-order as needed by BUGOUT
contrast with the signature of XREADGROUP, which allows multiple streams (keys)
Scope is too broad compared to what we need. We'll just copy paste a bit of the klaxit project and add the sorted streams functionality