neachdainn/nng-rs

Using `nng` together with `futures-rs`

Closed this issue · 1 comments

Hi! I have been tinkering yesterday evening with how to use nng together with a tokio project. I am however not really sure whether I understand your Context and Aio implementations correctly - after all, both speak about a "context".

Here is a (basic) sample implementation that I thought I could use to experiment with the PUB protocol.

use futures::{task::Poll, Sink};
use nng::{Aio, AioState, Error, Message, Socket};
use std::{pin::Pin, sync::atomic::Ordering::Acquire};

struct Pub0Coupler {
    aio: Aio,
    socket: Socket,
}

impl Pub0Coupler {
    fn connect_to_address(addr: &str) -> Result<Pub0Coupler, nng::Error> {
        let socket = Socket::new(nng::Protocol::Pub0)?;
        let aio = Aio::new(|_, _| ())?;
        socket.dial_async(addr)?;
        Ok(Self { aio, socket })
    }
}

impl<T> Sink<T> for Pub0Coupler {
    type Error = Error;
    fn poll_ready(
        self: Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), <Self as futures::Sink<T>>::Error>> {
        if let AioState::Inactive = self.aio.state(Acquire) {
            Poll::Ready(Ok(()))
        } else {
            Poll::Pending
        }
    }
    fn start_send(self: Pin<&mut Self>,  data: T) -> Result<(), <Self as futures::Sink<T>>::Error> {
        let msg = Message::new();
        // ... put data here to the message... //
        match self.socket.send_async(&self.aio, msg) {
            Ok(()) => Ok(()),
            Err((_, e)) => Err(e),
        }
    }
    fn poll_flush(
        self: Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), <Self as futures::Sink<T>>::Error>> {
        Poll::Ready(Ok(()))
    }
    fn poll_close(
        self: Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), <Self as futures::Sink<T>>::Error>> {
        Poll::Ready(Ok(()))
    }
}

I have a few questions now.

  1. By using a single Aio context, is this essentially a sequential interface? E.g., ignoring context switching, would I be about as well off if I spawned the socket.send() call into a blocking pool? I'm not sure from the nng documentation I fully get the reason for Contexts existence - I see that you have this "worker" approach in your example but only comment that it is NOT representing workers - however I'm not sure what this represents. So besides convenience, does using a single Aio instance here actually add something?
  2. I'm not sure whether I need to do something specific on the poll_flush and poll_close calls - I found somewhere that flushing is not really exposed by nng and it's best to simply wait. Would it make sense here to also explicitly sleep like this to make sure we are actually done sending?
 if let AioState::Inactive = self.aio.state(Acquire) {
            Poll::Ready(Ok(()))
        } else {
            tokio::time::sleep(Duration::from_millis(50)).await;
            Poll::Pending
        }
  1. For poll_close, I'd naively expect to call socket.close(), however since that is already called when the socket is droped and I also risk loosing messages here - better have the same implementation as above to "give it a bit of time"?
  2. In this use case (where I essentially let the rust-side take the main control of scheduling here) am I overseeing any more interaction with the nng runtime? As you see I'm essentially just checking whether it's idle or not - is that correct here or can I do a bit better?

Would love to hear your feedback!

This is a cross-post corresponding with https://gitlab.com/neachdainn/nng-rs/-/issues/58. Feel free to close what you consider to be a duplicate.