Terkwood/BUGOUT

Draft time-ordered message processing using klaxit/redis-stream-rs

Terkwood opened this issue · 6 comments

Time-ordered message processing using klaxit/redis-stream-rs

BUGOUT's redis stream code consists of a wasteful amount of boilerplate, using repeated definitions of Unacknowledeged , etc. It could be vastly simplified by moving to something like klaxit/redis-stream-rs

But that library can only read from one stream at a time. Draft an alternative implementation that can process all entries from multiple streams in time order, as required by BUGOUT.

Planning

Cargo feature

It should use our fork of redis-stream-rs, and declare the "interleaved" processing as a Cargo-level feature, so that we retain the option to contribute it back to the original repository.

Design scribble

We can basically reuse the pattern of sorting xids found in BUGOUT. This Rc is captured by all of the stream-handling closures.

let unsorted: Rc<Mutex<HashMap<XID, StreamInput>>> = Rc::new(Mutex::new(HashMap::new()));

This almost worked in #501:

changes from klaxit approach

In order to accommodate sorting, we need to split up the xread_options call, the per-message self.consume call, and the xack call. See fn process_message(&mut self, id: &str, message: &Message) -> Result<()> {

Needs more investigation, see the note in #501.

I worry that for our use case, the strategy used in the klaxit lib could be problematic. In #501 we had to acknowledge all of the streams prior to actually processing their data. Could this cause different timings for our simple logic? My attempt at this may have also confused keys with ids 😄

Looking further at the redis-stream lib, their Consumer implementation is centered around reading a single stream. This looks like a potential jumping off point for an alternative program that can read multiple streams, then sort them in time-order as needed by BUGOUT

https://github.com/klaxit/redis-stream-rs/blob/f2ba0f98045cf4ade2cb9e1ccc22bcfc5cde09ad/src/consumer.rs#L93

contrast with the signature of XREADGROUP, which allows multiple streams (keys)

Scope is too broad compared to what we need. We'll just copy paste a bit of the klaxit project and add the sorted streams functionality