Sending from multiple sources to a memory server
dignifiedquire opened this issue · 0 comments
dignifiedquire commented
Unfortunately the current design of the inmemory channels doesn't allow to send from multiple sources, at least asfaict. I tried implementing a custom transport using flume
channels, but failed to actually make them work, there seems to be sth strange happening once they are cloned.
Do you have any thoughts on what could be the issue or if this could be easily added to the existing implementation?
Current implementation
/// Returns two channel peers with buffer equal to `capacity`. Each [`Stream`] yields items sent
/// through the other's [`Sink`].
pub fn bounded<SinkItem, Item>(
capacity: usize,
) -> (Channel<SinkItem, Item>, Channel<Item, SinkItem>)
where
Item: Send + Sync + 'static,
SinkItem: Send + Sync + 'static,
{
let (tx1, rx2) = flume::bounded(capacity);
let (tx2, rx1) = flume::bounded(capacity);
(
Channel {
tx: tx1.into_sink(),
rx: rx1.into_stream(),
},
Channel {
tx: tx2.into_sink(),
rx: rx2.into_stream(),
},
)
}
/// A bi-directional channel backed by a [`Sender`](flume::Sender) and [`Receiver`](flume::Receiver).
pub struct Channel<Item, SinkItem>
where
Item: Send + Sync + 'static,
SinkItem: Send + Sync + 'static,
{
rx: flume::r#async::RecvStream<'static, Item>,
tx: flume::r#async::SendSink<'static, SinkItem>,
}
impl<Item, SinkItem> Clone for Channel<Item, SinkItem>
where
Item: Send + Sync + 'static,
SinkItem: Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
rx: self.rx.clone(),
tx: self.tx.clone(),
}
}
}
impl<Item, SinkItem> Stream for Channel<Item, SinkItem>
where
Item: Send + Sync + 'static,
SinkItem: Send + Sync + 'static,
{
type Item = Result<Item, ChannelError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, ChannelError>>> {
Pin::new(&mut self.rx)
.poll_next(cx)
.map(|option| option.map(Ok))
}
}
/// Errors that occur in the sending or receiving of messages over a channel.
#[derive(thiserror::Error, Debug)]
pub enum ChannelError {
/// An error occurred sending over the channel.
#[error("an error occurred sending over the channel")]
Send(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
}
impl<Item, SinkItem> Sink<SinkItem> for Channel<Item, SinkItem>
where
Item: Send + Sync + 'static,
SinkItem: Send + Sync + 'static,
{
type Error = ChannelError;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx)
.poll_ready(cx)
.map_err(|e| ChannelError::Send(Box::new(e)))
}
fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
Pin::new(&mut self.tx)
.start_send(item)
.map_err(|e| ChannelError::Send(Box::new(e)))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx)
.poll_flush(cx)
.map_err(|e| ChannelError::Send(Box::new(e)))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx)
.poll_close(cx)
.map_err(|e| ChannelError::Send(Box::new(e)))
}
}