Implement Sink for Sender
korrat opened this issue · 18 comments
This could be easily implemented. I think my main hesitation is that the Sink trait is not very popular and my impression is that its future is uncertain.
But maybe it doesn’t matter. Adding a Sink impl probably doesn’t hurt - it can only make things better.
cc @yoshuawuyts do you perhaps have an opinion on this?
I don't particularly like the Sink
trait in both shape and functionality, but am sympathetic to those who do. My main concern with adding the trait is that it pulls in everything and the kitchen sink (ha) from the futures-util
crate. If I'm not mistaken this includes the futures-rs channel impl too.
I would like to recommend that if we add Sink
to async-channel
we do so exclusive behind a feature flag to ensure async-channel
continues to compile quickly.
edit (2020-08-24): A good post on sinks vs iterators in C++ was posted by a WG21 member; I think the issues raised translate to Future Sinks vs async Iteration in Rust as well (link).
There is a futures-sink
crate, that only includes the trait and has no dependencies. That should keep compile times down.
I had a look at the Sink
impl on piper, and probably that can be ported quite easily. However, I don't really understand the reasoning behind the event-listeners, like
- why is there both
send_ops
,sink_ops
andrecv_ops
on the inner as well as a listener on theSender
/Receiver
? - why distinguish between
send_ops
andsink_ops
at all, since they both just try to send? - why does
try_recv
call bothsend_ops.notify_one()
andsink_ops.notify_all()
? - where is the link being made between the
sink_ops
on theInner
and the one on theSender
?
So I don't really feel confident proposing a PR on code I don't quite understand. Also the VecDeque
is an understandable choice to avoid a panic in Sink::start_send
, but it does have the downside of making the buffer size unbounded. Maybe at least initialize with VecDeque::with_capacity(1)
?
I would propose to add an adapter to consume a Stream, like:
Sender::drain(self, s: Stream) -> DrainFuture
Would that help?
@stsydow I don't think that helps when you need an interface. That is if you want to pass the channel to something that takes a Sink
. For me being able to use it as an interface is all the reason. It allows to write crates that use channels, but that can transparently support different channel implementations.
I proposed it as Sink
is quite controversial and I don't expect it to become a Standard soon.
Tokio for example recently dropped Sink
support.
I know the pain and am searching for a solution for my own library.
I personally don't like how Sink works either, but until we propose something better, it's the interface we have and most async projects will have the futures lib in their deps anyway, so it doesn't add to much strain. Some discussion was started here, but it hasn't seem much action.
Is there any movement here? I have a project that takes Stream
s as view input and I would love to take Sink
as view output to abstract out the channel implementation and keep the API balanced. Sink
provides Sink::with
which is the contravariant of Stream::map
- this keeps things nice and balanced. Instead my API must take a impl Stream
as input and then take async_channel::Sender<Event>
as output, mapping output events using the Receiver<Event>
. It seems clunky and quite a leaky abstraction.
Hi from the future. In the future we still have Futures, and Sink is alive and well.
futures::channel::mpsc::Sender
implements Sink and it's no big deal.