zesterer/flume

MPMC Not working or no Broadcasting

lsabi opened this issue · 3 comments

lsabi commented

First of all, this is a nice library and it is high-level yet has good performances.

Description of the problem

I'm planning of using an MPMC approach (or SPMC) where each message produced shall arrive to every single consumer. Now, MPMC is quite a broad term and it does not imply that it does broadcasting, but neither is it written somewhere that it does not.

Thus, my question is the following:

Does this library support MPMC in the sense of message broadcasting? If yes, below there's a counterexample. I may be using it incorrectly though, but the docs are not clear about how to use MPMC with broadcast functionality. If not supported, is there a plan to support it? Or is there a simple alternative way of reaching my goal?

Thanks in advance.

Cargo.toml

[package]
name = "my_test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
flume = "0.11.0"

main.rs

use std::thread;
use std::fs::File;
use std::io::{Write, BufReader, BufRead, Error};

fn main() {
    let (tx, rx) = flume::unbounded();

	let rx_copy_1 = rx.clone();
	let t_1 = thread::spawn(move || {
		let mut file = File::create("thread_1.txt").unwrap();
		for msg in rx_copy_1.iter() {
			println!("Received 1: {}", msg);
			write!(file, "{}", msg).unwrap();
		}
	});

	let rx_copy_2 = rx.clone();
	let t_2 = thread::spawn(move || {
		let mut file = File::create("thread_2.txt").unwrap();
		for msg in rx_copy_2.iter() {
			println!("Received 2: {}", msg);
			write!(file, "{}", msg).unwrap();
		}
	});

    tx.send("Hello, world!").unwrap();
    tx.send("How are you today?").unwrap();

    drop(tx);

    t_1.join().unwrap();
	t_2.join().unwrap();
}

Flume is an MPMC channel without broadcasting.

The implementation of a broadcast channel versus a regular MPMC channel is quite different so I don't think there's any plan to support broadcast. In my opinion it would end up being an entirely different crate.

That being said, there are a number of broadcast channels which might serve your use case if you look on crates.io :)

Going to close this issue. Flume is explicitly not a broadcast queue and although broadcast queues look superficially similar when in use, their implementation is radically different.

If you want to see what an also no-unsafe broadcast channel might look like, I wrote one a while ago after collaborating on flume: https://github.com/Restioson/barrage/blob/master/src/lib.rs. As you can see, it's very, very different from flume.

Not to say that this is the one to choose when choosing a broadcast channel; I am sure that there are many high-performance options out there that you can choose from!