google/tarpc

Sending from multiple sources to a memory server

dignifiedquire opened this issue · 0 comments

Unfortunately the current design of the inmemory channels doesn't allow to send from multiple sources, at least asfaict. I tried implementing a custom transport using flume channels, but failed to actually make them work, there seems to be sth strange happening once they are cloned.

Do you have any thoughts on what could be the issue or if this could be easily added to the existing implementation?

Current implementation

/// Returns two channel peers with buffer equal to `capacity`. Each [`Stream`] yields items sent
/// through the other's [`Sink`].
pub fn bounded<SinkItem, Item>(
    capacity: usize,
) -> (Channel<SinkItem, Item>, Channel<Item, SinkItem>)
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    let (tx1, rx2) = flume::bounded(capacity);
    let (tx2, rx1) = flume::bounded(capacity);
    (
        Channel {
            tx: tx1.into_sink(),
            rx: rx1.into_stream(),
        },
        Channel {
            tx: tx2.into_sink(),
            rx: rx2.into_stream(),
        },
    )
}

/// A bi-directional channel backed by a [`Sender`](flume::Sender) and [`Receiver`](flume::Receiver).
pub struct Channel<Item, SinkItem>
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    rx: flume::r#async::RecvStream<'static, Item>,
    tx: flume::r#async::SendSink<'static, SinkItem>,
}

impl<Item, SinkItem> Clone for Channel<Item, SinkItem>
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    fn clone(&self) -> Self {
        Self {
            rx: self.rx.clone(),
            tx: self.tx.clone(),
        }
    }
}

impl<Item, SinkItem> Stream for Channel<Item, SinkItem>
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    type Item = Result<Item, ChannelError>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Item, ChannelError>>> {
        Pin::new(&mut self.rx)
            .poll_next(cx)
            .map(|option| option.map(Ok))
    }
}

/// Errors that occur in the sending or receiving of messages over a channel.
#[derive(thiserror::Error, Debug)]
pub enum ChannelError {
    /// An error occurred sending over the channel.
    #[error("an error occurred sending over the channel")]
    Send(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl<Item, SinkItem> Sink<SinkItem> for Channel<Item, SinkItem>
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    type Error = ChannelError;

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.tx)
            .poll_ready(cx)
            .map_err(|e| ChannelError::Send(Box::new(e)))
    }

    fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
        Pin::new(&mut self.tx)
            .start_send(item)
            .map_err(|e| ChannelError::Send(Box::new(e)))
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.tx)
            .poll_flush(cx)
            .map_err(|e| ChannelError::Send(Box::new(e)))
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.tx)
            .poll_close(cx)
            .map_err(|e| ChannelError::Send(Box::new(e)))
    }
}