ReactiveX/RxScala

RxJava 2+ support

sbarlabanov opened this issue ยท 28 comments

Hi,
Is there any plan for that?
I saw David had created a fork and started porting to RxJava 2.0.
Do you need any help? Is there any list of tasks to do?
I would like to help a little bit as far as I can ;).

I have privately done a tiny amount of exploration, but am not aware of any consolidated effort to provide a scala RxJava2 wrapper.

I know very little about Scala but I managed to use RxJava 2 from it (Scala should be good at interoperation). The annoyances I've run into were the inability to use nice lambdas (in 2.11) and for many operators I had to specify the type parameters explicitly (map[String](mapToString)) otherwise weird inference errors pop up. What are the pain points for you people?

@akarnokd You can definitely make RxJava2 work directly but it requires additional boilerplate and isn't very scala idiomatic making it overal akward, a big minus especially considering there are some idiomatic alternatives (e.g. Akka Streams,Swave, Scalaz FS2/Stream)

Some pain points/arguments for a wrapper would include:

  • A wrapper can provide some scala-idiomatic methods RxJava lacks, e.g. obs1 ++ obs2 is impossible to provide in a Java API but trivially implementable in a scala wrapper by defining a ++ method as an alias of concat
  • Scala lamdbas and Java lamdbas/SAMs aren't compatible (pre scala 2.12)
  • Scala co- and contravariance differs from Java's resulting in types like Observable[_ <: T] instead of Observable[T] when used directly
  • Scala type interference cannot use type information in the same argument list. This is worked around by currying some parts of the API e.g. def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] because def scan[R](initialValue: R, accumulator: (R, T) => R): Observable[R] (like RxJava's) would always require an explicit type argument.

...and also methods to create Observable from Scala Future, Try, Seq, etc.

Hi, I've done quite a bit of work on this; but it's essentially a rewrite (adding proper co/contra variance, implicits for methods such as flatten, scala language features as well as a rename of many of the methods to the ones used in scala (zipWith to map2, etc...).

All of this is implemented as AnyVal decorators, so the overhead is essentially 0.

I'm unsure about the scope of this project and whether I should submit a PR, or put it in a different repo.

EDIT: adding link

Nice ๐Ÿ˜ƒ
Originally, we also used value classes ("extends AnyVal") for RxScala, but we moved away from them because you can't extend value classes, and Subject should extend Observable, see this thread.
I'd love to see solutions for this... ๐Ÿ˜‰
I think it's the simplest to have one repo for RxScala based on RxJava 1, and another repo for RxScala based on RxJava 2.

@samuelgruetter The best solution is to provide an implicit conversion within the object like I've done with the Awaitable (see the doctest which you can run by doing test).

I'm thinking of also adding compat in separate modules with scalaz and cats but having the case core compile with just the stdlib (2.10.6, 2.11.8 and 2.12.2) and adding law tests ala discipline.

The scope of the rewrite which I'm doing is,

  1. Only really support going from Java -> Scala and not Java -> Scala -> Java again, since it pretty much hampers what scala features are used.
  2. Wrap it so that it feels like a library written in scala
  3. Add decent testing support for scalatest and later specs2

This is going pretty well.

I've been toying around with boilerplate generation since it was getting cumbersome to convert manually for the higher arity functions, esp those which are curried, as well as using SAM conversions depending on the scala version (important for pre 2.12).

That's now done, so the conversions between all the function types are auto-generated as well as some nicer syntax to get the scala compiler to do most of the inference work (see .convertK[Predicate]). All of this is hidden from users, but it just means the library is easier to maintain.

I think the next part adding the rest of the functions should be pretty straight forward.

@samuelgruetter The best solution is to provide an implicit conversion within the object like I've done with the Awaitable (see the doctest which you can run by doing test).

This could work, but consider this example: If I have an AsyncSubject and I want to pass it to a function expecting an Observable, two upcasts will be needed: from AsyncSubject to Subject, and from Subject to Observable, but the Scala compiler never chains two implicit conversions, so you'd have to define an additional implicit conversion going from AsyncSubject to Observable.
This is just to illustrate that replacing subtyping by implicit conversions might be tricky, require more boilerplate code than expected, and might be more confusing to for users, especially those who don't like implicits (I know many of them...).
But maybe subjects are the only area of RxJava with subtyping, and they might not be the most important part of RxJava, so implicits instead of subtyping could work, but I'd just be careful to make sure the user experience is good.

I'm thinking of also adding compat in separate modules with scalaz and cats but having the case core compile with just the stdlib (2.10.6, 2.11.8 and 2.12.2) and adding law tests ala discipline.

Yes, avoiding the dependency on scalaz and cats would be good, to have an adaptor which is as general and un-opinionated as possible.

Only really support going from Java -> Scala and not Java -> Scala -> Java again, since it pretty much hampers what scala features are used.

I don't understand this, could you please elaborate?

Wrap it so that it feels like a library written in scala

๐Ÿ‘

Add decent testing support for scalatest and later specs2

Testing is important, but there's no need to re-test RxJava.
Instead, what's more important, is to test that the signatures make sense, and are convenient to use, and that no overloading clashes occur, and type inference works well, etc. Just something to keep in mind when testing ๐Ÿ˜‰

I think the next part adding the rest of the functions should be pretty straight forward.

Some reasons why it might not be straightforward are summarized in this comment ๐Ÿ˜‰ Or, in other words, that's a list of things I'm unhappy about in the RxJava-1.0 based RxScala, it'd be great if you can avoid making these mistakes/find solutions.

@samuelgruetter

Thanks for the links. I've read through all of them and have got some ideas of how to solve them; I'll update the POC during the next few weeks while I play around with the syntax.

For the second point, at some point when working with Java/Scala shops, one of the requirements was that any scala code could be called from java; which basically meant that you couldn't use any scala features like implicits etc...

Also, I'm not suggesting to test rxjava; I'd hope that it's well tested ๐Ÿ˜œ; more it's great to be able to use scalatest and specs2 with matchers specific to rxscala so you can do stuff like;

    Observable(1, 2, 3) should (emit (1, 2, 3) then complete)

within your tests.

@samuelgruetter @akarnokd

Is there any way to make a flatMap stacksafe variant so you can do recursive flatMaps for singles? i.e. def recurse(i: Single[Int]): Single[Int] = i.flatMap(ii => recurse(ii + 1).

Maybe you could use Schedulers.trampoline() for this?

@samuelgruetter Not really sure how that would help - it doesn't seem to trampoline the function stack at all.

The stack overflow is coming from the fact that subscribe and onSuccess are essentially nested for the flatMap operation.

at io.reactivex.Single.subscribe(Single.java:2696)
	at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback.onSuccess(SingleFlatMap.java:84)
	at io.reactivex.internal.operators.single.SingleJust.subscribeActual(SingleJust.java:30)
	at io.reactivex.Single.subscribe(Single.java:2692)
	at io.reactivex.internal.operators.single.SingleFlatMap.subscribeActual(SingleFlatMap.java:36)
	at io.reactivex.Single.subscribe(Single.java:2692)
	at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback.onSuccess(SingleFlatMap.java:84)
	at io.reactivex.internal.operators.single.SingleJust.subscribeActual(SingleJust.java:30)
	at io.reactivex.Single.subscribe(Single.java:2692)
	at io.reactivex.internal.operators.single.SingleFlatMap.subscribeActual(SingleFlatMap.java:36)

I've never used the trampoline scheduler myself, but I vaguely remember from a coursera course that it can be used to avoid recursion stackoverflow, and this link says so too. Did you try something like they suggest there?

@samuelgruetter I see what you mean; I'd used it as a scheduler because I had assumed it would do something similar under the hood.

I'll give it a go later this week.

Hi guys, just wondering if there is there any movement on this?

I am needing to make a decision on whether to move back to Java 1.x or wait until RxScala supports 2.x in a library, while I'm not in a super hurry, it'd be handy to know if we're looking at days/weeks/months before there's something mostly usable!

Any progress on this? Is any help needed?

@akarnokd I think it we could be make it a light wrapper around rxJava?

@hepin1989 I don't know or use Scala, I have no idea how to do it.

@akarnokd I know Scala and I do love RXJava too .
The reactor way maybe a nice reference.
https://github.com/reactor/reactor-scala-extensions

hudak commented

This may be sacrilegious, but https://github.com/monix/monix is a pretty good Observable implementation written natively in Scala with support for Reactive Streams.

ReactiveX is a fantastic library and I love would chip in to update it, if I had the time...

@hudak So do I ,but I am currently busy on translation the Reactive Design Patterns book :(.
Monix is nice to use too , but is not a Flowable :)

Hi guys!

While working with rxjava from scala, I've found that its api is pretty nice, but it doesn't mix well with scala - too much manual type signatures required. So I came up with a thin wrapper just to get rid of annoying typing. That simple wrappers work surprisingly well for our project, while not allow to use observables in for-comprehensions(why would you do it, anyway?), make using java api much less painful. The wrapper is far from being complete(I've wrapped only methods that our project requires).

So, I'd say, that "lightweight wrappers" way should be considered, at least.

Thanks.

sinwe commented

The reactor way maybe a nice reference.
https://github.com/reactor/reactor-scala-extensions

I'm the author of that library. I can help on RxScala for RxJava2 if needed. I use rxscala for RxJava1, but I would love to use RxJava2 if possible.

I can also help, is there any repo for RxScala2?

zella commented

I tired https://github.com/monix/monix and it's excellent replacement for rxscala. Most api are same. Only problem - small community, and (according to my feelings) It may contain critical bugs

sinwe commented

I tried it too and it seems to be working great. What kind of bug is that? Have you reported it?

Leaving this issue open in case anyone wants to work on a RxScala 3+