smol-rs/async-channel

Implement Sink for Sender

korrat opened this issue · 18 comments

The futures crate contains a Sink trait. This trait allows for a convenient interaction between its implementors and streams using the forward method on the StreamExt trait.

So I think implementing Sink for Sender would improve the ergonomics of the library.

This could be easily implemented. I think my main hesitation is that the Sink trait is not very popular and my impression is that its future is uncertain.

But maybe it doesn’t matter. Adding a Sink impl probably doesn’t hurt - it can only make things better.

cc @yoshuawuyts do you perhaps have an opinion on this?

I don't particularly like the Sink trait in both shape and functionality, but am sympathetic to those who do. My main concern with adding the trait is that it pulls in everything and the kitchen sink (ha) from the futures-util crate. If I'm not mistaken this includes the futures-rs channel impl too.

I would like to recommend that if we add Sink to async-channel we do so exclusive behind a feature flag to ensure async-channel continues to compile quickly.

edit (2020-08-24): A good post on sinks vs iterators in C++ was posted by a WG21 member; I think the issues raised translate to Future Sinks vs async Iteration in Rust as well (link).

There is a futures-sink crate, that only includes the trait and has no dependencies. That should keep compile times down.

I had a look at the Sink impl on piper, and probably that can be ported quite easily. However, I don't really understand the reasoning behind the event-listeners, like

  • why is there both send_ops, sink_ops and recv_ops on the inner as well as a listener on the Sender/Receiver?
  • why distinguish between send_ops and sink_ops at all, since they both just try to send?
  • why does try_recv call both send_ops.notify_one() and sink_ops.notify_all()?
  • where is the link being made between the sink_ops on the Inner and the one on the Sender?

So I don't really feel confident proposing a PR on code I don't quite understand. Also the VecDeque is an understandable choice to avoid a panic in Sink::start_send, but it does have the downside of making the buffer size unbounded. Maybe at least initialize with VecDeque::with_capacity(1)?

I would propose to add an adapter to consume a Stream, like:
Sender::drain(self, s: Stream) -> DrainFuture

Would that help?

@stsydow I don't think that helps when you need an interface. That is if you want to pass the channel to something that takes a Sink. For me being able to use it as an interface is all the reason. It allows to write crates that use channels, but that can transparently support different channel implementations.

I proposed it as Sink is quite controversial and I don't expect it to become a Standard soon.
Tokio for example recently dropped Sink support.
I know the pain and am searching for a solution for my own library.

I personally don't like how Sink works either, but until we propose something better, it's the interface we have and most async projects will have the futures lib in their deps anyway, so it doesn't add to much strain. Some discussion was started here, but it hasn't seem much action.

Is there any movement here? I have a project that takes Streams as view input and I would love to take Sink as view output to abstract out the channel implementation and keep the API balanced. Sink provides Sink::with which is the contravariant of Stream::map - this keeps things nice and balanced. Instead my API must take a impl Stream as input and then take async_channel::Sender<Event> as output, mapping output events using the Receiver<Event>. It seems clunky and quite a leaky abstraction.

Hi from the future. In the future we still have Futures, and Sink is alive and well.

futures::channel::mpsc::Sender implements Sink and it's no big deal.