/kv_mpsc

A mpsc implement that support key conflict detecting

Primary LanguageRustMIT LicenseMIT

kv_mpsc

This is a code project from DatenLord

A kv_mpsc is a bounded channel like mpsc in rust std, but it support message with key.

All messages in kv_mpsc have single/multiple key(s), once a message is consumed by a receiver, it's key(s) will be active, and other messages that have key(s) conflict with active keys could not be consumed by receivers; when the message is droped, it's key(s) will be removed from the active keyset.

Design

Shared Queue

The core data structure of kv_mpsc is Shared.

  • state is the state of a share queue, use mutext to protect it.
  • When the queue is empty, receiver will wait on condvar fill.
  • when the queue is full, sender will wait on condvar empty.
/// shared state between senders and receiver
#[derive(Debug)]
pub(crate) struct Shared<K: Key, V> {
    /// the queue state
    pub(crate) state: Mutex<State<K, V>>,
    /// cond var that representes fill a new message into queue
    pub(crate) fill: Condvar,
    /// cond var that representes consume a message from queue
    pub(crate) empty: Condvar,
}

The queue State is as follows:

  • Buff is a fixed size vec to temporarily store messages.
  • n_senders is the number of senders.
  • disconnected is a flag to indicate whether the channel is disconnected(all sender gone or receiver closed).
/// the state of queue
#[derive(Debug)]
pub(crate) struct State<K: Key, V> {
    /// queue buffer
    pub(crate) buff: Buff<K, V>,
    /// n senders of the queue
    pub(crate) n_senders: u32,
    /// is the queue disconnected
    /// all sender gone or receiver closed
    pub(crate) disconnected: bool,
}

The Buff is as follows:

  • buff is the actual buffer.
  • cap is the capacity of the buffer.
  • activate_keys is the set of current active keys.
/// A fixed size buff
#[derive(Debug)]
pub(crate) struct Buff<K: Key, V> {
    /// FIFO queue buff
    buff: BuffType<Message<K, V>>,
    /// capacity of buff
    cap: usize,
    /// all current active keys
    activate_keys: HashSet<K>,
}

Use a feature list to control the based buff type. I think LinkedList maybe better than VecDeque if there are frequent conflicts, because VecDeque need to move all elements after the removed one;

#[cfg(feature = "list")]
use std::collections::LinkedList;
#[cfg(feature = "list")]
/// actual buffer type
type BuffType<T> = LinkedList<T>;
#[cfg(not(feature = "list"))]
use std::collections::VecDeque;
#[cfg(not(feature = "list"))]
/// actual buffer type
type BuffType<T> = VecDeque<T>;

Send/Recv

The process of send is as follows:

  • acquire a empty buffer slot
  • check whether the channel is disconnected
  • push message
  • notify receiver
    pub(crate) fn send(
        &self, message: Message<K, V>,
    ) -> Result<(), SendError<Message<K, V>>> {
        let mut state = self.acquire_send_slot();
        if state.disconnected {
            return Err(SendError(message));
        }
        state.buff.push_back(message);
        drop(state);
        self.fill.notify_one();
        Ok(())
    }

The process of receive is as follows:

  • wait until buffer is not empty
  • check whether the channel is disconnected
  • consume a message that is not conflict with active keys
  • notify a sender

the pop_unconflict_front method scan messages from front to back, and pop the first message that is not conflict with current active keys.

pub(crate) fn recv(&self) -> Result<Message<K, V>, RecvError> {
    let mut state = unwrap_ok_or!(self.state.lock(), err, panic!("{:?}", err));
    if state.buff.is_empty() && !state.disconnected {
        state = unwrap_ok_or!(self.fill.wait(state), err, panic!("{:?}", err));
    }
    if state.buff.is_empty() && state.disconnected {
        return Err(RecvError::Disconnected);
    }
    let value = state.buff.pop_unconflict_front();
    // notify the blocked sender corrospend to this message
    drop(state);
    // notify other blocked sender
    self.empty.notify_one();
    value
}

A message also contains a Ref to the shared queye state Shared, when it drops, it will remove it's key(s) from the current active keyset, then other messages conflict with it could be consumed.

impl<K: Key, V> Drop for Message<K, V> {
    #[inline]
    fn drop(&mut self) {
        if let Some(shared) = self.shared.take() {
            let mut state = unwrap_ok_or!(shared.state.lock(), err, panic!("{:?}", err));
            match self.key {
                KeySet::Single(ref k) => state.buff.remove_active_key(k),
                KeySet::Multiple(ref keys) => {
                    for k in keys.iter() {
                        state.buff.remove_active_key(k);
                    }
                }
            }
        }
    }
}

The SyncSender is just a wrapper of Shared, and it's send method is just a wrapper of Shared::send.

The Receiver contains a marker to make it !Sync so that only one thread could receive message from it. and it's send method is responsible for setting Shared for msg returned by Shared::recv.

/// A sync sender that will block when there no empty buff slot
#[derive(Debug)]
pub struct SyncSender<K: Key, V> {
    /// inner shared queue
    inner: Arc<Shared<K, V>>,
}

/// A sync receiver will block when buff is empty
#[derive(Debug)]
pub struct Receiver<K: Key, V> {
    /// shared FIFO queue
    inner: Arc<Shared<K, V>>,
    /// remove the auto `Sync` implentation, so only one
    /// thread can access the receiver
    _marker: std::marker::PhantomData<RefCell<()>>,
}

Async

The logic of asynchronous and synchronous is basically the same, the main thing is using tokio semaphore and notify to replace conditional variables.

Bench

send_recv is a simple bench program containes 3 bench functions send on 10 threads and recv on 1 thread, the three functions are std mpsc, kv_mpsc without key conflict and kv_mpsc with key conflict respectively.

bench env:

  • cpu: 11700H@4.8GHz with 8 cores and 16 threads
  • OS: Ubuntu 22.04
  • tokio runtime: 8 workers

From the result, we can see that:

  • The kv_mpsc channel based on LinkedList is about 13% slower than the one based on VecDeque when no conflicts happens, and even worse when conflicts happens. Maybe list operations are even more expensive than move value in vec. The propgram is naive, if I have more time, maybe I could find the exact reason.
  • without conflicts, the performance of kv_mpsc is similar to std mpsc with 16 threads(senders), but with conflicts, the performance of kv_mpsc is much worse than std mpsc.
  • The tokio async mpsc is the fastest one, and the async kv_mpsc is also much faster than the sync version.

The VecDeque result:

MultiThread Send and Recv/std mpsc
                        time:   [62.295 ms 72.933 ms 83.896 ms]
Benchmarking MultiThread Send and Recv/kv_mpsc no conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 21.7s, or reduce sample count to 20.
MultiThread Send and Recv/kv_mpsc no conflict
                        time:   [220.10 ms 233.60 ms 248.91 ms]
Found 18 outliers among 100 measurements (18.00%)
  2 (2.00%) low severe
  7 (7.00%) low mild
  3 (3.00%) high mild
  6 (6.00%) high severe
Benchmarking MultiThread Send and Recv/kv_mpsc with conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 58.0s, or reduce sample count to 10.
MultiThread Send and Recv/kv_mpsc with conflict
                        time:   [565.39 ms 570.14 ms 575.05 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

Benchmarking MultiThread Send and Recv/tokio mpsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.2s, or reduce sample count to 90.
MultiThread Send and Recv/tokio mpsc
                        time:   [50.516 ms 50.738 ms 50.954 ms]
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) low mild
Benchmarking MultiThread Send and Recv/async kv_mpsc no conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 14.2s, or reduce sample count to 30.
MultiThread Send and Recv/async kv_mpsc no conflict
                        time:   [141.56 ms 142.62 ms 143.59 ms]
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low severe
  4 (4.00%) low mild
Benchmarking MultiThread Send and Recv/async kv_mpsc with conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 56.0s, or reduce sample count to 10.
MultiThread Send and Recv/async kv_mpsc with conflict
                        time:   [529.49 ms 538.47 ms 546.28 ms]
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low severe
  1 (1.00%) low mild
  2 (2.00%) high mild

The LinkedList result:

MultiThread Send and Recv/std mpsc
                        time:   [189.98 ms 192.11 ms 193.45 ms]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) low severe
  1 (1.00%) low mild
  1 (1.00%) high mild
  1 (1.00%) high severe
Benchmarking MultiThread Send and Recv/kv_mpsc no conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 25.0s, or reduce sample count to 10.
MultiThread Send and Recv/kv_mpsc no conflict
                        time:   [248.44 ms 249.08 ms 249.71 ms]
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  2 (2.00%) high mild
Benchmarking MultiThread Send and Recv/kv_mpsc with conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 59.8s, or reduce sample count to 10.
MultiThread Send and Recv/kv_mpsc with conflict
                        time:   [587.36 ms 592.99 ms 598.48 ms]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild

Benchmarking MultiThread Send and Recv/tokio mpsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.3s, or reduce sample count to 90.
MultiThread Send and Recv/tokio mpsc
                        time:   [50.757 ms 50.920 ms 51.079 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) low mild
Benchmarking MultiThread Send and Recv/async kv_mpsc no conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 15.1s, or reduce sample count to 30.
MultiThread Send and Recv/async kv_mpsc no conflict
                        time:   [149.49 ms 151.01 ms 152.42 ms]
Found 7 outliers among 100 measurements (7.00%)
  2 (2.00%) low severe
  5 (5.00%) low mild
Benchmarking MultiThread Send and Recv/async kv_mpsc with conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 52.4s, or reduce sample count to 10.
MultiThread Send and Recv/async kv_mpsc with conflict
                        time:   [541.56 ms 546.82 ms 552.06 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) low mild

optimization

pop_unconflict_front will scan the buff from curr instead of 0. activate_keys record index of the first msg that conflict with the active key in it, when that key become deactive, curr will be set to that index. After doing that, with cap = 1000, send = 10000, threads = 16, the time consumption of async send_recv/async kv_mpsc with conflict has reduced 70%.

/// A fixed size buff
#[derive(Debug)]
pub(crate) struct KeyedBuff<T: BuffMessage> {
    /// FIFO queue buff
    buff: BuffType<T>,
    /// capacity of buff
    cap: usize,
    /// keys is current active key, value point to first msg
    /// in buff that conflict with that key, cap means None
    activate_keys: HashMap<<T as BuffMessage>::Key, usize>,
    /// curr scan start position
    curr: usize,
}

Research & Study

event_listener. core structure:

/// Inner state of [`Event`].
struct Inner {
    /// The number of notified entries, or `usize::MAX` if all of them have been notified.
    ///
    /// If there are no entries, this value is set to `usize::MAX`.
    notified: AtomicUsize,

    /// A linked list holding registered listeners.
    list: Mutex<List>,

    /// A single cached list entry to avoid allocations on the fast path of the insertion.
    cache: UnsafeCell<Entry>,
}

pub struct Event {
    /// A pointer to heap-allocated inner state.
    ///
    /// This pointer is initially null and gets lazily initialized on first use. Semantically, it
    /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
    /// reference count.
    inner: AtomicPtr<Inner>,
}

pub struct EventListener {
    /// A reference to [`Event`]'s inner state.
    inner: Arc<Inner>,

    /// A pointer to this listener's entry in the linked list.
    entry: Option<NonNull<Entry>>,
}

/// The state of a listener.
enum State {
    /// It has just been created.
    Created,

    /// It has received a notification.
    ///
    /// The `bool` is `true` if this was an "additional" notification.
    Notified(bool),

    /// An async task is polling it.
    Polling(Waker),

    /// A thread is blocked on it.
    Waiting(Unparker),
}

use mutex to project inner.list, atomic op and fence to synchronize inner ptr and inner.notified.

when listener call listen on Event, the inner is cloned, and a new waiter entry is insert to inner.list.

in sync impl, when listener task call EventListener.wait, listener's state will be set to Waiting, then call park or park_timeout to wait until notification is received or the timeout is reached.

in async impl, when poll is called on EventListener, pass the waker to listener state.

when call notify on Event, it traversals the list, then unpark or wake some blocking threads/tasks.

tokio Notify

core structure:

#[derive(Debug)]
pub struct Notify {
    // This uses 2 bits to store one of `EMPTY`,
    // `WAITING` or `NOTIFIED`. The rest of the bits
    // are used to store the number of times `notify_waiters`
    // was called.
    state: AtomicUsize,
    waiters: Mutex<WaitList>,
}
/// Future returned from [`Notify::notified()`].
///
/// This future is fused, so once it has completed, any future calls to poll
/// will immediately return `Poll::Ready`.
#[derive(Debug)]
pub struct Notified<'a> {
    /// The `Notify` being received on.
    notify: &'a Notify,

    /// The current state of the receiving process.
    state: State,

    /// Entry in the waiter `LinkedList`.
    waiter: UnsafeCell<Waiter>,
}

type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;

struct Waiter {
    /// Intrusive linked-list pointers.
    pointers: linked_list::Pointers<Waiter>,

    /// Waiting task's waker.
    waker: Option<Waker>,

    /// `true` if the notification has been assigned to this waiter.
    notified: Option<NotificationType>,

    /// Should not be `Unpin`.
    _p: PhantomPinned,
}

tokio Notify is very similar to event_listener.

when call notified on Notify, a Notified is returned, when poll on Notified, the task may get a permit from Notify.state immediately, or set Notify.state to waiting, and push new waiter into Notify.waiters, following call to poll will check wether the task is notified.

when call notify_one on Notify, if there is no task wait, just store 1 permit in the state, or notify one task in waiters.

Main differences between them:

  • Notify can store a permit in state, so that if call notify_one before any task call notified().await, then the first task await will get a permit immediately, without lock the waiter list, due to this, Notify didn't push new waiter into list until await. event_listener push new entry into list when call listen, but event_listener did the optimization of Entry allocation.
  • event_listener use more relaxed memory order while Notify only use SeqCst
  • Notify only support notifying one task(store a permit if no task waiting) or notifying all waiting task(do nothing if no task waiting), event_listener support notifying arbitrary number of tasks.

Write a simple bench in mock_mpsc, result is as follow. Both take almost the same amount of time, but the wait counts of tokio Notify is about twice as much as event_listener. The reason is that notify_one will store a permit in it's state and cause "false positive".The first call of await will return immediately and found there is still no data in buff, and wait again. In kv_mpsc, that will cause twice try_recv invocations.

130 % cargo run --release --bin mock_mpsc
   Compiling kv_mpsc v0.1.0 (/home/waruto/repos/kv_mpsc)
    Finished release [optimized] target(s) in 0.52s
     Running `target/release/mock_mpsc`
wait 1023 times
notify cost 21.210985742s
wait 512 times
envent listener cost 21.202538365s

Then introduce event_listener to my kv_mpsc, and run the previous send&recv bench.

Notify result is:

async send_recv/tokio mpsc
                        time:   [69.548 ms 70.039 ms 70.481 ms]
Found 17 outliers among 100 measurements (17.00%)
  3 (3.00%) low severe
  9 (9.00%) low mild
  4 (4.00%) high mild
  1 (1.00%) high severe
Benchmarking async send_recv/async kv_mpsc no conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 11.9s, or reduce sample count to 40.
async send_recv/async kv_mpsc no conflict
                        time:   [118.81 ms 118.94 ms 119.08 ms]
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  1 (1.00%) high severe
Benchmarking async send_recv/async kv_mpsc with conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 29.9s, or reduce sample count to 10.
async send_recv/async kv_mpsc with conflict
                        time:   [205.67 ms 206.49 ms 207.30 ms]

event_listener result is:

async send_recv/tokio mpsc
                        time:   [68.907 ms 69.229 ms 69.492 ms]
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) low severe
  4 (4.00%) low mild
Benchmarking async send_recv/async kv_mpsc no conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 11.1s, or reduce sample count to 40.
async send_recv/async kv_mpsc no conflict
                        time:   [110.16 ms 110.30 ms 110.43 ms]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) low mild
  1 (1.00%) high mild
Benchmarking async send_recv/async kv_mpsc with conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 26.9s, or reduce sample count to 10.
async send_recv/async kv_mpsc with conflict
                        time:   [186.56 ms 187.02 ms 187.49 ms]
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) low severe
  2 (2.00%) high mild

The kv_mpsc based on event_listener is faster than the one based on Notify, but does this situation really due to the number of try_recv invocations? So I add some more code for profiling, write a bench profile.

The result is:

➜  kv_mpsc git:(dev) ✗ cargo run --release -F=profile --bin profile
    Finished release [optimized] target(s) in 0.01s
     Running `target/release/profile`
total time cost 6.407398222s
wait count 5, try_recv cost time 1.291725157s

➜  kv_mpsc git:(dev) ✗ cargo run --release -F=profile,event_listener --bin profile
    Finished release [optimized] target(s) in 0.01s
     Running `target/release/profile`
total time cost 6.233926911s
wait count 5, try_recv cost time 1.1262019s

The result shows that the wait count is just 5 both, while the number of times recv has been called is 8000000. When comparing the two, the former is not worth mentioning. But there is still an obvious difference in the time consumption of try_recv between them, what's more, the difference between runtime of try_recv is almost the same with difference between total runtime. I suspect this may be due to the SeqCst memory order used by Notify, which affects the efficiency of shared state locking.

Final Optimization

/// A fixed size buff
#[derive(Debug)]
pub(crate) struct KeyedBuff<T: BuffMessage> {
    /// FIFO queue buff, store msgs that without conflitc
    ready: BuffType<T>,
    /// msgs that conflict with that key
    pending_on_key: HashMap<<T as BuffMessage>::Key, Vec<Rc<T>>>,
    /// capacity of buff
    cap: usize,
    /// size of buff now
    size: usize,
}

When push_back new msg, if the msg's keys are conflict with a key in pending_on_key, then push the msg into Vec<Rc<T>> corrosponding to that key, else just record it's keys in pending_on_key, then push it into ready. When recv, just pop_front ready. When drop a msg, for each key of it, remove the fisrt Rc pending on it, if the Rc's ref count is 1, push it to ready.

constructing a bench(async_with_conflict) with extremely large number of conflicts.

before doing optimiztion(on dev branch), the result is:

Benchmarking async send_recv/async kv_mpsc with conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 303.1s, or reduce sample count to 10.
async send_recv/async kv_mpsc with conflict
                        time:   [2.9468 s 2.9993 s 3.0562 s]
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) low mild
  1 (1.00%) high mild
  3 (3.00%) high severe

after doing optimiztion, the result is:

Benchmarking async send_recv/async kv_mpsc with conflict: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 20.4s, or reduce sample count to 20.
async send_recv/async kv_mpsc with conflict
                        time:   [201.71 ms 202.84 ms 204.11 ms]
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) high mild
  4 (4.00%) high severe

There is a 14 times difference between the two.