Using `nng` together with `futures-rs`
Closed this issue · 1 comments
dbischof90 commented
Hi! I have been tinkering yesterday evening with how to use nng
together with a tokio
project. I am however not really sure whether I understand your Context
and Aio
implementations correctly - after all, both speak about a "context".
Here is a (basic) sample implementation that I thought I could use to experiment with the PUB protocol.
use futures::{task::Poll, Sink};
use nng::{Aio, AioState, Error, Message, Socket};
use std::{pin::Pin, sync::atomic::Ordering::Acquire};
struct Pub0Coupler {
aio: Aio,
socket: Socket,
}
impl Pub0Coupler {
fn connect_to_address(addr: &str) -> Result<Pub0Coupler, nng::Error> {
let socket = Socket::new(nng::Protocol::Pub0)?;
let aio = Aio::new(|_, _| ())?;
socket.dial_async(addr)?;
Ok(Self { aio, socket })
}
}
impl<T> Sink<T> for Pub0Coupler {
type Error = Error;
fn poll_ready(
self: Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> Poll<Result<(), <Self as futures::Sink<T>>::Error>> {
if let AioState::Inactive = self.aio.state(Acquire) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn start_send(self: Pin<&mut Self>, data: T) -> Result<(), <Self as futures::Sink<T>>::Error> {
let msg = Message::new();
// ... put data here to the message... //
match self.socket.send_async(&self.aio, msg) {
Ok(()) => Ok(()),
Err((_, e)) => Err(e),
}
}
fn poll_flush(
self: Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> Poll<Result<(), <Self as futures::Sink<T>>::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(
self: Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> Poll<Result<(), <Self as futures::Sink<T>>::Error>> {
Poll::Ready(Ok(()))
}
}
I have a few questions now.
- By using a single
Aio
context, is this essentially a sequential interface? E.g., ignoring context switching, would I be about as well off if I spawned thesocket.send()
call into a blocking pool? I'm not sure from thenng
documentation I fully get the reason forContext
s existence - I see that you have this "worker" approach in your example but only comment that it is NOT representing workers - however I'm not sure what this represents. So besides convenience, does using a singleAio
instance here actually add something? - I'm not sure whether I need to do something specific on the
poll_flush
andpoll_close
calls - I found somewhere that flushing is not really exposed bynng
and it's best to simply wait. Would it make sense here to also explicitly sleep like this to make sure we are actually done sending?
if let AioState::Inactive = self.aio.state(Acquire) {
Poll::Ready(Ok(()))
} else {
tokio::time::sleep(Duration::from_millis(50)).await;
Poll::Pending
}
- For
poll_close
, I'd naively expect to callsocket.close()
, however since that is already called when the socket isdrop
ed and I also risk loosing messages here - better have the same implementation as above to "give it a bit of time"? - In this use case (where I essentially let the rust-side take the main control of scheduling here) am I overseeing any more interaction with the
nng
runtime? As you see I'm essentially just checking whether it's idle or not - is that correct here or can I do a bit better?
Would love to hear your feedback!
dbischof90 commented
This is a cross-post corresponding with https://gitlab.com/neachdainn/nng-rs/-/issues/58. Feel free to close what you consider to be a duplicate.