/stream_router

A Futures aware message router for dynamically routing between Streams and Sinks

Primary LanguageRustApache License 2.0Apache-2.0

Stream Router

Latest Version License Downloads

This crate provides a StreamRouter struct that is capable of dynamically routing values between Streams and Sinks.

API Documentation

crates.io


It's common when working with Streams and Sinks to build up boilerplate code comprised of chained Stream combinators and bespoke business logic for safely routing between Streams and Sinks. This crate attempts to provide a generic implementation of a universal combinator and dynamic future-aware router while having minimal dependencies and also being executor-agnostic.

StreamRouter is the primary Struct of this crate that is capable of dynamically routing values between Streams and Sinks. A StreamRouter is at it's core a Stream that can take ownership of any number of other Streams and any number of Sinks and dynamically route values yielded from the Streams to any one of the provided Sinks through user-defined routing rules.

Each Sink provided to the StreamRouter is tagged with a user-defined Hashable value. This tag is utilized by the router to identify and differentiate Sinks and is what the user will utilize to reference a specific Sink when defining the routing logic.

Each Stream is provided with a matching closure that consumes the values yielded by the accompanying Stream and returns a Future that will resolve to one of the tags identifying a specific Sink that the yielded value will be forwarded to. If no Sink is found for the returned routing tag the value will be yielded from the StreamRouter itself.

The StreamRouter makes the guarantee that order will be preserved for values yielded from Stream "A" and sent to Sink "B" such that "A" will not attempt to sink any values into "B" until all previous values from "A" sent to "B" have been processed. There are no cross-Stream or cross-Sink timing or ordering guarentees.

Example

The following example is simple.rs from the examples folder. This simple example illustrates the StreamRouter forwarding all even values to the even_chan_tx while all odd numbers are yielded by the StreamRouter itself. A user could decide to provide a second Sink to explicitly consume odd values if desired, in which case the StreamRouter would never yield any values itself.

use futures::{channel::mpsc, future, stream, stream::StreamExt};
use stream_router;
use tokio;

#[tokio::main]
async fn main() {
    let mut router = stream_router::StreamRouter::new();
    let nums = stream::iter(0..1_000);
    let (even_chan_tx, mut even_chan_rx) = mpsc::channel(10);

    router.add_source(nums, |x| future::lazy(move |_| (x, x % 2 == 0)));
    router.add_sink(even_chan_tx, true);

    loop {
        tokio::select! {
            v = router.next() => {
                println!("odd number:  {:?}", v.unwrap());
            }
            v = even_chan_rx.next() => {
                println!("even number: {:?}", v.unwrap());
            }
        }
    }
}

Routing Logic

The StreamRouter's routing logic is provided by the user in the form of closures that can map values yielded by a specific Stream into tags that identify specific Sinks. These closures follow the form of Fn(A) -> Future<Output=T> where A is a value yielded by the Stream and where T is a tag that the user has assigned to one of their Sinks. It should be noted that the closure takes ownership of the values yielded by the stream and is responsible for also returning the values as part of the tuple that contains the Stream tag. This is done to avoid the need to clone() each value but also allows the user to potentially "map" the values if beneficial to their specific use-case. While simple routing (such as shown above) has no real need to utilize the flexibility provided by returning a Future, the option to return a Future allows for more complex state-ful routing. An example of utilizing state-ful routing to dedup an incoming Stream can be found in the dedup.rs example.

License

Licensed under Apache License, Version 2.0