Can't work with flume mpmc channel
TheWaWaR opened this issue · 0 comments
TheWaWaR commented
If there are multiple flume receivers, only the first cloned receiver can receive message from the senders.
Below is a simple test project code copied from my repo.
If I ran mpmc channel test in glommio
fixed runtime.
$ cargo run -- glommio
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/flume-multi-consumer-demo glommio`
Starting glommio runtime...
[receiver#1] received value: 0 from 1
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 5
[receiver#1] received value: 1 from 1
[receiver#1] received value: 1 from 2
[receiver#1] received value: 1 from 3
[receiver#1] received value: 1 from 4
[receiver#1] received value: 1 from 5
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 2
[receiver#1] received value: 2 from 3
[receiver#1] received value: 2 from 4
[receiver#1] received value: 2 from 5
As you can see only the first cloned receiver can receive message.
Run mpmc channel test in glommio
pooled runtime:
$ cargo run -- glommio-pool
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/flume-multi-consumer-demo glommio-pool`
Starting glommio runtime...
Starting glommio runtime...
[receiver#3] received value: 0 from 1
[receiver#3] received value: 0 from 2
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 1
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 5
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 4
[receiver#3] received value: 0 from 5
Starting glommio runtime...
Starting glommio runtime...
[receiver#1] received value: 0 from 1
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 5
[receiver#1] received value: 0 from 1
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 5
[receiver#1] received value: 1 from 1
[receiver#1] received value: 1 from 2
[receiver#1] received value: 1 from 3
[receiver#1] received value: 1 from 4
[receiver#1] received value: 1 from 5
[receiver#2] received value: 1 from 1
[receiver#2] received value: 1 from 2
[receiver#2] received value: 1 from 3
[receiver#2] received value: 1 from 4
[receiver#2] received value: 1 from 5
[receiver#3] received value: 1 from 1
[receiver#3] received value: 1 from 2
[receiver#3] received value: 1 from 4
[receiver#3] received value: 1 from 5
[receiver#3] received value: 1 from 3
[receiver#1] received value: 1 from 1
[receiver#1] received value: 1 from 2
[receiver#1] received value: 1 from 3
[receiver#1] received value: 1 from 4
[receiver#1] received value: 1 from 5
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 2
[receiver#1] received value: 2 from 2
[receiver#1] received value: 2 from 4
[receiver#1] received value: 2 from 3
[receiver#1] received value: 2 from 5
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 2
[receiver#1] received value: 2 from 4
[receiver#1] received value: 2 from 5
[receiver#2] received value: 2 from 3
[receiver#2] received value: 2 from 4
[receiver#3] received value: 2 from 5
[receiver#3] received value: 2 from 3
[receiver#3] received value: 2 from 5
[receiver#3] received value: 2 from 3
[receiver#1] received value: 2 from 4
[receiver#3] received value: 2 from 2
As you can see the value received multiple times, this is bad :-(
Run mpmc channel test in tokio
$ cargo run -- tokio
Compiling flume-multi-consumer-demo v0.1.0 (/home/weet/projects/thewawar/flume-multi-consumer-demo)
Finished dev [unoptimized + debuginfo] target(s) in 1.37s
Running `target/debug/flume-multi-consumer-demo tokio`
Starting tokio runtime...
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 5
[receiver#2] received value: 0 from 1
[receiver#3] received value: 0 from 3
[receiver#1] received value: 1 from 5
[receiver#2] received value: 1 from 2
[receiver#3] received value: 1 from 1
[receiver#1] received value: 1 from 3
[receiver#2] received value: 1 from 4
[receiver#3] received value: 2 from 5
[receiver#1] received value: 2 from 4
[receiver#2] received value: 2 from 2
[receiver#3] received value: 2 from 3
[receiver#1] received value: 2 from 1
It just work as expected.
The dependencies:
[dependencies]
flume = { version = "0.10", features = ["async"] }
glommio = { version = "0.7.0" }
tokio = { version = "1.23.0", features = ["full"] }
The Rust code:
use std::env;
use std::time::Duration;
use flume::{bounded, Receiver, Sender};
use glommio::{CpuSet, LocalExecutorBuilder, LocalExecutorPoolBuilder, Placement, PoolPlacement};
fn main() {
let mut args = env::args();
match args.nth(1).unwrap().as_str() {
"glommio-pool" => run_glommio_pool(),
"glommio" => run_glommio(),
"tokio" => run_tokio(),
action => panic!("invalid action: {}", action),
}
}
fn run_glommio_pool() {
let cpu_set = CpuSet::online().expect("online cpus");
let cpu_num = num_cpus::get();
let placement = PoolPlacement::MaxSpread(cpu_num, Some(cpu_set));
let (sender, receiver) = bounded::<(usize, usize)>(2);
LocalExecutorPoolBuilder::new(placement)
.on_all_shards(|| async move {
println!("Starting glommio runtime...");
for id in [1, 2, 3] {
let receiver = receiver.clone();
glommio::spawn_local(flume_recv(id, receiver)).detach();
}
for id in 1..6 {
let sender = sender.clone();
glommio::spawn_local(flume_send_glommio(id, sender)).detach();
}
glommio::timer::sleep(Duration::from_secs(100000)).await;
})
.unwrap()
.join_all();
}
fn run_glommio() {
LocalExecutorBuilder::new(Placement::Fixed(0))
.spawn(|| async move {
println!("Starting glommio runtime...");
let (sender, receiver) = bounded::<(usize, usize)>(2);
for id in [1, 2, 3] {
let receiver = receiver.clone();
glommio::spawn_local(flume_recv(id, receiver)).detach();
}
for id in 1..6 {
let sender = sender.clone();
glommio::spawn_local(flume_send_glommio(id, sender)).detach();
}
glommio::timer::sleep(Duration::from_secs(100000)).await;
})
.unwrap()
.join()
.unwrap();
}
fn run_tokio() {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
println!("Starting tokio runtime...");
let (sender, receiver) = bounded::<(usize, usize)>(2);
for id in [1, 2, 3] {
let receiver = receiver.clone();
tokio::spawn(flume_recv(id, receiver));
}
for id in 1..6 {
let sender = sender.clone();
tokio::spawn(flume_send_tokio(id, sender));
}
tokio::time::sleep(Duration::from_secs(1000000)).await;
});
}
async fn flume_send_glommio(id: usize, sender: Sender<(usize, usize)>) {
loop {
for n in 0usize.. {
if let Err(err) = sender.send_async((id, n)).await {
panic!("send error: {:?}", err);
}
glommio::timer::sleep(Duration::from_secs(1)).await;
}
}
}
async fn flume_send_tokio(id: usize, sender: Sender<(usize, usize)>) {
loop {
for n in 0usize.. {
if let Err(err) = sender.send_async((id, n)).await {
panic!("send error: {:?}", err);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
async fn flume_recv(id: usize, receiver: Receiver<(usize, usize)>) {
loop {
let (sender_id, value) = match receiver.recv_async().await {
Ok(value) => value,
Err(err) => panic!("[id:{}] recv error: {:?}", id, err),
};
println!(
"[receiver#{}] received value: {} from {}",
id, value, sender_id
);
}
}