ReactiveX/RxJava

Passing composite data through flatMap and similar operators without creating container objects

jondwillis opened this issue ยท 22 comments

Take the following contrived example:

        Observable
                .just("yo")
                .flatMap(s -> Observable.range(0, 100))
                .subscribe(integer -> Ln.d("Here's an Integer(%s), but how do I access that juicy String?", integer));

I often come up against this problem, where I'd like to act upon multiple chained streams. I can easily imagine a couple of ways to access both the String Observable and the Integer Observable within a downstream subscribe() or some other operator. One is pulling the String Observable into a variable and then accessing it within a lambda. Another is to do something like:

        Observable
                .just("yo")
                .flatMap(s -> 
                        Observable.combineLatest(
                                Observable.just(s),
                                Observable.range(0, 100),
                                Pair::new))
                .subscribe(pair -> Ln.d("Here's an Integer(%s), and here's a juicy String(%s), but isn't this a little hard to follow and annoying?", pair.second, pair.first));

But this necessitates repackaging the original Observable with the dependent Observable into some more complex data structure.

There's gotta be a better way. Help?

There is an overload to flatMap that will pair up the original source value and each value returned for it through the Observable returned by the function:

Observable<R> flatMap(
            final Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
            final Func2<? super T, ? super U, ? extends R> resultSelector)

Wow, that is definitely cleaner. But it still looks like you must create some sort of container object.

It also appears as if switchMap and concatMap do not have the same capabilities.. perhaps that is by design.

You might want to checkout this proposal from @stealthcode and I #2452.

The proposal @abersnaze mentioned is something like a BiObservable where there are 2 elements of data. In principle there would be a Tri, Quad, etc... Observable. I would love it if this were more of a first class citizen in RxJava since this use case seems to crop up again and again. Here is a unit test that shows the principles of how this would work.

Currently I am working on an RxJava Extensions proposal to make it easier to transform from rx.Observable into other kinds of Observables, for instance from an rx.Observable to an rx.extensions.BiObservable.

@akarnokd can you shed any light into why operators like concatMap and switchMap do not have the same resultSelector overload methods? I think I'll try my hand at a pull request.

You do not actually need that overload, you can easily achieve the effect using a nested map.

Try this:

import rx.Observable;

public class ComposeViaMap {
    public static void main(String... args) {
        Observable
                .just("a", "b")
                .flatMap(s ->
                        Observable.range(0, 100)
                                .map(integer -> String.format("Here's an Integer(%s), with String(%s)", integer, s)))
                .subscribe(System.out::println);
    }
}

@headinthebox @benjchristensen Yup, those two approaches work, but still produce subjectively ugly code which is annoying to compose since I find myself needing to unpackage and repackage the combined objects in each operator:
(contrived example, again)

    .flatMap(session -> service.getVideoPlayerStateSubject(),
                 (Func2<StubSessionImpl, Enums.VideoPlayerState, Pair<StubSessionImpl, Enums.VideoPlayerState>>) Pair::new)
    .distinctUntilChanged()
    .takeUntil(otherThing.skip(1))
    .doOnNext(pair -> {
        StubSessionImpl session = pair.first;
        Enums.VideoPlayerState videoPlayerState = pair.second;
        Ln.d("Here's a session %s and here's a videoPlayerState %s", session, videoPlayerState);
    })
    .subscribe(pair -> {
        StubSessionImpl session = pair.first;
        Enums.VideoPlayerState videoPlayerState = pair.second;
        // do some things with both session and videoPlayerState
    }, Ln::e);

I suppose at this point I'm just looking for some sugar syntax/ container objects to use.

Yup. It has drawbacks. You've been given a handful of answers. What do you suggest if the answers provided are insufficient?

I'm appreciative! I'm still trying to find a good solution, and I'll post here if I ever find anything that isn't kludgy ;)

That would be great to hear what you find.

Also, if you haven't looked at the solution by @stealthcode above, do so. It hurt my head at first but is powerful when passing multiple values.

Closing this due to staleness.

Thanks @jondwillis for the combineLatest workaround...I found that easier to read than the flatMap overload...

I hate having to pass a tuple down the chain. There should be a stash operator or skipWithValue or something of that essence that fixes this.

Did you ever find any solution for this?
I have the same situation and I think I have to store the values into state store which I wouldn't like to do.

i also don't like creating a tuple for a functionality that should be inside the framework

We had this concept that didn't get any traction.
https://github.com/stealthcode/RxJavaMulti

I am new to Rx so please help me to determine if the problem I am facing is a similar one:

    someObservable
               .flatMapIterable(items -> items)
               .flatMap(item -> check(item))
               .flatMap(item1 -> update(item1))
               ...

the first flatMap() returns an item1 corresponding to the item sent to it and for the next flatmap() I need the previous item and it's corresponding return value item1

Is this the same problem and is there a good solution?

Please help!

@amanshuraikwar There is a library that may be what you are looking for: https://github.com/pakoito/Komprehensions