Add blocking send/recv
akhilles opened this issue · 6 comments
I believe this can be done without polluting the public API of this crate. Something like this would be very convenient:
// async
s.send(msg).await;
r.recv().await;
// blocking
s.send(msg).wait();
r.recv().wait();
Should also be quite easy to implement:
#[must_use]
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}
impl<'a, T: Unpin> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let msg = self.msg.take().unwrap();
match self.sender.try_send(msg) {
Ok(_) => return Poll::Ready(Ok(())),
Err(PushError::Closed(m)) => return Poll::Ready(Err(SendError(m))),
Err(PushError::Full(m)) => self.msg = Some(m),
}
match self.listener.as_mut() {
Some(listener) => match Pin::new(listener).poll(cx) {
Poll::Ready(()) => self.listener = None,
Poll::Pending => return Poll::Pending,
},
None => self.listener = Some(self.sender.channel.send_ops.listen()),
}
}
}
}
impl<'a, T> Send<'a, T> {
pub fn wait(mut self) -> Result<(), SendError<T>> {
loop {
let msg = self.msg.take().unwrap();
match self.sender.try_send(msg) {
Ok(_) => return Ok(()),
Err(PushError::Closed(m)) => return Err(SendError(m)),
Err(PushError::Full(m)) => self.msg = Some(m),
}
match self.listener.take() {
Some(listener) => listener.wait(),
None => self.listener = Some(self.sender.channel.send_ops.listen()),
}
}
}
}
From some super basic benchmarking (spsc), this implementation is ~30% faster than doing block_on(s.send(msg))
and block_on(r.recv())
. It was also slightly faster than crossbeam_channel::bounded
for my use case.
Would also be great if Receiver
could implement Iterator
.
@akhilles When you say it's 30% faster than block_on(s.send(msg))
, what kind of block_on()
are you using?
If you're using smol::block_on()
/async_io::block_on()
, can you also try smol::future::block_on()
/futures_lite::future::block_on()
?
Would also be great if
Receiver
could implementIterator
.
For that, you can do stream::block_on(r)
: https://docs.rs/smol/1.0.0/smol/stream/fn.block_on.html
futures_lite::future::block_on()
was faster (379.19 us) than smol::block_on()
(403.14 us) but the .wait()
version was still faster at 322.77 us:
futures-mpsc-blocking time: [3.8368 ms 3.8493 ms 3.8621 ms]
change: [-3.0426% -2.1633% -1.3784%] (p = 0.00 < 0.05)
Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
1 (1.00%) high mild
1 (1.00%) high severe
async-channel-blocking time: [319.70 us 322.77 us 326.41 us]
change: [-7.0724% -5.5294% -3.9943%] (p = 0.00 < 0.05)
Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
2 (2.00%) high mild
2 (2.00%) high severe
async-channel-lite-block-on
time: [373.44 us 379.19 us 384.94 us]
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
async-channel-smol-block-on
time: [394.20 us 403.14 us 411.93 us]
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
crossbeam time: [400.94 us 405.53 us 410.99 us]
change: [-2.9895% -1.5899% -0.1656%] (p = 0.03 < 0.05)
Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
1 (1.00%) high mild
1 (1.00%) high severe
This is my benchmark code: https://gist.github.com/akhilles/0223464983706c97ea5117c3ca305180.
Just noticed I was using older versions of smol
/futures-lite
. With latest version, the difference is a lot smaller:
futures-mpsc-blocking time: [3.8724 ms 3.8841 ms 3.8962 ms]
change: [-0.3589% +0.0611% +0.5171%] (p = 0.79 > 0.05)
No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high mild
async-channel-blocking time: [331.72 us 340.38 us 349.75 us]
change: [-4.6662% -2.8133% -0.7524%] (p = 0.01 < 0.05)
Change within noise threshold.
Found 8 outliers among 100 measurements (8.00%)
4 (4.00%) high mild
4 (4.00%) high severe
async-channel-lite-block-on
time: [363.41 us 369.80 us 377.26 us]
change: [-8.4948% -6.8854% -5.1680%] (p = 0.00 < 0.05)
Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
2 (2.00%) high mild
1 (1.00%) high severe
async-channel-smol-block-on
time: [6.9663 ms 7.0117 ms 7.0590 ms]
change: [+1.5372% +2.4164% +3.2627%] (p = 0.00 < 0.05)
Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high mild
crossbeam time: [408.03 us 409.98 us 411.95 us]
change: [-0.9665% +0.0213% +1.0723%] (p = 0.97 > 0.05)
No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
5 (5.00%) high mild
Since the difference is only 8%, I'd prefer to focus on making block_on()
even more efficient and closing the gap, rather than adding new API :)
This is not the first time a blocking API has been proposed and I'm sympathetic to that use case. Admittedly, typing block_on(s.send(msg))
is a bit of a chore but probably not that bad.
Interestingly, in futures v0.1, the FutureExt
trait had wait()
method that would block on a future and it was really convenient. In the end, it was removed and replaced with block_on()
to make it less likely to mistype .await
for .wait()
, or naively type .wait()
without understanding its consequences.
Since the difference is only 8%, I'd prefer to focus on making
block_on()
even more efficient and closing the gap, rather than adding new API :)This is not the first time a blocking API has been proposed and I'm sympathetic to that use case. Admittedly, typing
block_on(s.send(msg))
is a bit of a chore but probably not that bad.
Makes sense :). I'm pretty happy with futures_lite::future::block_on()
+ send
/recv
performance.
Interestingly, in futures v0.1, the
FutureExt
trait hadwait()
method that would block on a future and it was really convenient. In the end, it was removed and replaced withblock_on()
to make it less likely to mistype.await
for.wait()
, or naively type.wait()
without understanding its consequences.
I do like the .await
/.wait()
syntax, I think I'll just roll my own extension trait with wait()
.