DataDog/glommio

Can't work with flume mpmc channel

TheWaWaR opened this issue · 0 comments

If there are multiple flume receivers, only the first cloned receiver can receive message from the senders.

Below is a simple test project code copied from my repo.

If I ran mpmc channel test in glommio fixed runtime.

$ cargo run -- glommio
    Finished dev [unoptimized + debuginfo] target(s) in 0.04s
     Running `target/debug/flume-multi-consumer-demo glommio`
Starting glommio runtime...
[receiver#1] received value: 0 from 1
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 5
[receiver#1] received value: 1 from 1
[receiver#1] received value: 1 from 2
[receiver#1] received value: 1 from 3
[receiver#1] received value: 1 from 4
[receiver#1] received value: 1 from 5
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 2
[receiver#1] received value: 2 from 3
[receiver#1] received value: 2 from 4
[receiver#1] received value: 2 from 5

As you can see only the first cloned receiver can receive message.

Run mpmc channel test in glommio pooled runtime:

$ cargo run -- glommio-pool
    Finished dev [unoptimized + debuginfo] target(s) in 0.04s
     Running `target/debug/flume-multi-consumer-demo glommio-pool`
Starting glommio runtime...
Starting glommio runtime...
[receiver#3] received value: 0 from 1
[receiver#3] received value: 0 from 2
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 1
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 5
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 4
[receiver#3] received value: 0 from 5
Starting glommio runtime...
Starting glommio runtime...
[receiver#1] received value: 0 from 1
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 5
[receiver#1] received value: 0 from 1
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 3
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 5
[receiver#1] received value: 1 from 1
[receiver#1] received value: 1 from 2
[receiver#1] received value: 1 from 3
[receiver#1] received value: 1 from 4
[receiver#1] received value: 1 from 5
[receiver#2] received value: 1 from 1
[receiver#2] received value: 1 from 2
[receiver#2] received value: 1 from 3
[receiver#2] received value: 1 from 4
[receiver#2] received value: 1 from 5
[receiver#3] received value: 1 from 1
[receiver#3] received value: 1 from 2
[receiver#3] received value: 1 from 4
[receiver#3] received value: 1 from 5
[receiver#3] received value: 1 from 3
[receiver#1] received value: 1 from 1
[receiver#1] received value: 1 from 2
[receiver#1] received value: 1 from 3
[receiver#1] received value: 1 from 4
[receiver#1] received value: 1 from 5
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 2
[receiver#1] received value: 2 from 2
[receiver#1] received value: 2 from 4
[receiver#1] received value: 2 from 3
[receiver#1] received value: 2 from 5
[receiver#1] received value: 2 from 1
[receiver#1] received value: 2 from 2
[receiver#1] received value: 2 from 4
[receiver#1] received value: 2 from 5
[receiver#2] received value: 2 from 3
[receiver#2] received value: 2 from 4
[receiver#3] received value: 2 from 5
[receiver#3] received value: 2 from 3
[receiver#3] received value: 2 from 5
[receiver#3] received value: 2 from 3
[receiver#1] received value: 2 from 4
[receiver#3] received value: 2 from 2

As you can see the value received multiple times, this is bad :-(

Run mpmc channel test in tokio

$ cargo run -- tokio
   Compiling flume-multi-consumer-demo v0.1.0 (/home/weet/projects/thewawar/flume-multi-consumer-demo)
    Finished dev [unoptimized + debuginfo] target(s) in 1.37s
     Running `target/debug/flume-multi-consumer-demo tokio`
Starting tokio runtime...
[receiver#1] received value: 0 from 2
[receiver#1] received value: 0 from 4
[receiver#1] received value: 0 from 5
[receiver#2] received value: 0 from 1
[receiver#3] received value: 0 from 3
[receiver#1] received value: 1 from 5
[receiver#2] received value: 1 from 2
[receiver#3] received value: 1 from 1
[receiver#1] received value: 1 from 3
[receiver#2] received value: 1 from 4
[receiver#3] received value: 2 from 5
[receiver#1] received value: 2 from 4
[receiver#2] received value: 2 from 2
[receiver#3] received value: 2 from 3
[receiver#1] received value: 2 from 1

It just work as expected.

The dependencies:

[dependencies]
flume = { version = "0.10", features = ["async"] }
glommio = { version = "0.7.0" }
tokio = { version = "1.23.0", features = ["full"] }

The Rust code:

use std::env;
use std::time::Duration;

use flume::{bounded, Receiver, Sender};
use glommio::{CpuSet, LocalExecutorBuilder, LocalExecutorPoolBuilder, Placement, PoolPlacement};

fn main() {
    let mut args = env::args();
    match args.nth(1).unwrap().as_str() {
        "glommio-pool" => run_glommio_pool(),
        "glommio" => run_glommio(),
        "tokio" => run_tokio(),
        action => panic!("invalid action: {}", action),
    }
}

fn run_glommio_pool() {
    let cpu_set = CpuSet::online().expect("online cpus");
    let cpu_num = num_cpus::get();
    let placement = PoolPlacement::MaxSpread(cpu_num, Some(cpu_set));
    let (sender, receiver) = bounded::<(usize, usize)>(2);
    LocalExecutorPoolBuilder::new(placement)
        .on_all_shards(|| async move {
            println!("Starting glommio runtime...");
            for id in [1, 2, 3] {
                let receiver = receiver.clone();
                glommio::spawn_local(flume_recv(id, receiver)).detach();
            }
            for id in 1..6 {
                let sender = sender.clone();
                glommio::spawn_local(flume_send_glommio(id, sender)).detach();
            }
            glommio::timer::sleep(Duration::from_secs(100000)).await;
        })
        .unwrap()
        .join_all();
}

fn run_glommio() {
    LocalExecutorBuilder::new(Placement::Fixed(0))
        .spawn(|| async move {
            println!("Starting glommio runtime...");
            let (sender, receiver) = bounded::<(usize, usize)>(2);
            for id in [1, 2, 3] {
                let receiver = receiver.clone();
                glommio::spawn_local(flume_recv(id, receiver)).detach();
            }
            for id in 1..6 {
                let sender = sender.clone();
                glommio::spawn_local(flume_send_glommio(id, sender)).detach();
            }
            glommio::timer::sleep(Duration::from_secs(100000)).await;
        })
        .unwrap()
        .join()
        .unwrap();
}

fn run_tokio() {
    tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async move {
            println!("Starting tokio runtime...");
            let (sender, receiver) = bounded::<(usize, usize)>(2);
            for id in [1, 2, 3] {
                let receiver = receiver.clone();
                tokio::spawn(flume_recv(id, receiver));
            }
            for id in 1..6 {
                let sender = sender.clone();
                tokio::spawn(flume_send_tokio(id, sender));
            }
            tokio::time::sleep(Duration::from_secs(1000000)).await;
        });
}

async fn flume_send_glommio(id: usize, sender: Sender<(usize, usize)>) {
    loop {
        for n in 0usize.. {
            if let Err(err) = sender.send_async((id, n)).await {
                panic!("send error: {:?}", err);
            }
            glommio::timer::sleep(Duration::from_secs(1)).await;
        }
    }
}
async fn flume_send_tokio(id: usize, sender: Sender<(usize, usize)>) {
    loop {
        for n in 0usize.. {
            if let Err(err) = sender.send_async((id, n)).await {
                panic!("send error: {:?}", err);
            }
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
    }
}

async fn flume_recv(id: usize, receiver: Receiver<(usize, usize)>) {
    loop {
        let (sender_id, value) = match receiver.recv_async().await {
            Ok(value) => value,
            Err(err) => panic!("[id:{}] recv error: {:?}", id, err),
        };
        println!(
            "[receiver#{}] received value: {} from {}",
            id, value, sender_id
        );
    }
}