rust-lang/futures-rs

Stream branch / fork combinator (includes implementation)

Opened this issue · 8 comments

Hey,

in one of my applications I was in need of branching / forking a stream that was carrying messages because I wanted to use them in multiple, independent parts of the application. I wanted to avoid the complexity of having to use channels (because I'd need a separate driver task for that) so I implemented a combinator which branches a stream in two, given the items are Clone.

I have come across this use case multiple times writing networked applications, and every time I was wishing futures-rs had an implementation ready-to-use. It also seems to be a feature that has been requested from time to time, as seen by the issues on this repo. Hence, I propose to add the combinator to StreamExt.

The implementation ensures one branch can never outpace the others by more than a fixed amount (which is 1 item, but it probably makes more sense not to disclose this as part of the public API), and uses just one allocation. Currently, it only supports branching into two children, but it could be changed to support branching to n children at the cost of another allocation. Branching can already be done recursively to increase the number of children.

If you want this in futures-rs I'll open a PR to do the integration and yank the library from crates.io afterwards. I think this would best fit as as the analogon for the Shared combinator that already exists for futures.

Looking forward to your responses.

Any feedback?

@NeoLegends I am no expert in pinning tbh, but I don't think it's a good idea to not Pin any generic type, which might be unsafe to move, even if it's under a smart pointer like: Box, Rc, Arc etc...

As I understand, it is safe to move anything that implements Unpin and those structures do implement it. It doesn't change the fact though that types contained by those structures might not be Unpin and if you have a &mut reference to the pointer, you can get mutable reference to the T they point to.

For example for Arc there is a method:
pub fn get_mut(this: &mut Arc<T>) -> Option<&mut T>.
Of course it requires there to be only one copies of the Arc, but it still is a potential issue in the future, if this functionality gets rewritten/modified or maybe language changes...
For the practical example of get_mut case go here.
This is a really good practical resource as well.

Anyways as I know Pin's whole purpose is to make sure that whatever happens, it is never safe to move anything that is !Unpin and without Pin you don't have that guarantee and it may work correctly now, but some day it might not and compiler won't warn and defend you from that.

Please someone correct me if I'm wrong, as I said before I'm no expert :)

Anyways as I know Pin's whole purpose is to make sure that whatever happens, it is never safe to move anything that is !Unpin and without Pin you don't have that guarantee and it may work correctly now, but some day it might not and compiler won't warn and defend you from that.

I think this is exactly the part where there is a confusion. To be able to construct a pin around a value it is required that either:

  • the value is Unpin, which just makes everything work since nothing cares about not being moved
  • you are sure the value never moves, which requires you to use the unsafe constructor (which I do in the library)

With that unsafe constructor you "prove" to the compiler that you have done your research and that you are sure the value never moves, and then you formalize those guarantees by constructing the Pin. I don't get the argument about "language changes" at all, that's what backwards compatibility is for.

I don't get the argument about "language changes" at all, that's what backwards compatibility is for.

New changes will be backwards compatible, unless major version is bumped, but that doesn't mean new features won't be added in. Before rust version 1.4.0, Arc didn't have a method get_mut and Arc has existed since version 1.0.0. So before 1.4.0 as far as I can tell, there was no way to move T contained inside Arc<T> in safe code, but now there is. That's the time of changes I am talking about.

  • you are sure the value never moves, which requires you to use the unsafe constructor (which I do in the library)

What I explained above is why you can't guarantee that value inside Arc won't be moved. You are promising that you won't move the value, but it isn't enforced by rust compiler, hence can be violated. Though I was wrong, this issue isn't exactly applicable to you, since you are using Mutex to wrap Inner, I missed that... But that actually makes it easier to violate Pin's guarantees, since we no longer need to get &mut reference to the contents of the Arc (which was only possible if only one copy of Arc existed). Reason for that is that Mutex is used, which needs immutable reference to get &mut Inner.

https://github.com/NeoLegends/gabelung/blob/92876d38a6e6d06d30370d68494578bb65dd3f35/src/lib.rs#L140-L149

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context,
    ) -> Poll<Option<Self::Item>> {
        let mut inner = self.inner.lock();
        let Inner {
            left,
            right,
            stream,
        } = &mut *inner;
...

Above you are able to get &mut reference to the stream, which means compiler won't warn/error if you move the stream accidentaly. So nothing is stopping me from doing:

impl<S, I> Branch<S, I> {
    fn evil_method(self: Pin<&mut Self>, mut other_stream: S) {
        let mut inner = self.inner.lock();
        let Inner {
            left,
            right,
            stream,
        } = &mut *inner;
        
        std::mem::swap(stream, &mut other_stream);
    }
}

This is exactly why Pin was created - it's very hard to memorize all the promises we made to the compiler, yet very easy to break things with one change.

I'm not trying to prove anything... I'm just excited about Rust and am trying to be helpful and potentially learn thing or two :)

Above you are able to get &mut reference to the stream, which means compiler won't warn/error if you move the stream accidentaly. So nothing is stopping me from doing:

Sure. That's perfectly right. The point here though is I'm not moving the value and that I'm so sure about that I'm getting the compiler to accept my code using unsafe. And that's exactly what unsafe is for -- proving to the compiler you made up your mind and know what you're doing.

That's also why I don't get your argument about language changes, you don't need Arc::get_mut (or any other "new" language feature) to break the code as is, as you proved in your example.

This is exactly why Pin was created - it's very hard to memorize all the promises we made to the compiler, yet very easy to break things with one change.

That's also right. However, as you can see here, in this case it's impossible to use it inside the data structure itself (w/o moving the inner stream into another allocation, which I considered an inacceptable performance hit), because there is no way currently to use Pin with Arc<Mutex<T>> the way I need it.

I get your point that this is a potential future maintenance hazard, but it's (sadly!) not one we can avoid by using types differently.

I would argue differently if there was a way to accomplish what I'm doing without unsafe, but there currently isn't any I'm aware of. I am very happy to be proven wrong eventually!

I get your point that this is a potential future maintenance hazard, but it's (sadly!) not one we can avoid by using types differently.

I would argue differently if there was a way to accomplish what I'm doing without unsafe, but there currently isn't any I'm aware of. I am very happy to be proven wrong eventually!

Yeah I get you on that part... I've been bumping my head around almost the same thing - how to branch stream into two streams. In my case I was trying to implement unzip operator, to split tuple stream into two streams. I tried many different ways... One that worked for me in the end was to poll base stream only from one of the children and send it to the other one using mpsc channels. You can see more details in the PR.

I don't think it has the issue we've talked about here, but it's not well tested so it might have other unforeseen issues... Idk, can't know for sure without more testing.

This unzip operator could be used for forking as well. Maybe by something like: stream.map(|x| (x, x.clone())).unzip().

Probably similar way of structuring stream would work in your case too.

How do you, in your implementation, ensure that the other half that's just receiving the mpsc values is making progress after the first one is dropped? Would that not stop the polling to the source stream? Do you pass over the stream to the other half?

I think this is a very valid use case that should be handled by any library implementation that does not want to make assumptions about the system it's being run in.

gabelung handles this properly, polling the source in lockstep and completely handing over the responsibility to poll when one of the halves is dropped. I very much invite you to use it and to see if it fits for your use case or if you can discover any lingerings bugs!

How do you, in your implementation, ensure that the other half that's just receiving the mpsc values is making progress after the first one is dropped? Would that not stop the polling to the source stream? Do you pass over the stream to the other half?

This is explained in pr. Both halfs store Pin<Arc<Stream>> of the base stream, but only first one accesses it and in case first gets dropped, second one will start polling that stream. When first half is dropped, it drops mpsc::Sender and wakes up the other half. Then other half tries to try_recv from mpsc and it fails because of Disconnected error, then it knows first one is dropped and starts polling base stream itself.