ReactiveX/RxScala

doOnCompleted never executes

Closed this issue · 9 comments

Using RxScala 0.25.0, the following example never calls the lambda that is provided to doOnCompleted:

    Observable.empty
      .doOnCompleted(() => println("DONE 1"))
      .subscribe(x=>x, e=>e, () => println("DONE 2"))

The same holds for this example:

    Observable[Unit](observer => {
      observer.onCompleted()
    }).doOnCompleted(() => println("DONE 1"))
      .subscribe(x=>x, e=>e, () => println("DONE 2"))

Both examples only print DONE 2.

Either I'm overlooking something obvious, or this is a bug.

You're overlooking something, but it's not obvious at all ;-)

The signature of doOnCompleted is:

def doOnCompleted(onCompleted: => Unit): Observable[T]

That is, it takes a by-name argument. So you have to use it as follows:

Observable.empty
        .doOnCompleted(println("DONE 1"))
        .subscribe(x=>x, e=>(), () => println("DONE 2"))

or, to convince yourself that the action passed to doOnCompleted is not already run when constructing the observable, you can write

    println("line1")
    val obs1 = Observable.empty.doOnCompleted(println("DONE 1"))
    println("line2")
    obs1.subscribe(x=>x, e=>(), () => println("DONE 2"))

which prints

line1
line2
DONE 1
DONE 2

What you were passing to doOnCompleted is a 0-arity function, i.e. an expression of type () => Unit, and by discarding the value of an expression, Scala can turn any expression into Unit, so that's why it compiled.

Originally, doOnCompleted took () => Unit, which I find more intuitive and safer. But then, it was "simplified" into => Unit in ReactiveX/RxJava#1345.
So maybe we should revert that change?

@samuelgruetter Thank you for the clarification, I completely overlooked the by-name parameter. It's quite confusing though since subscribe does take the onCompleted parameter as a 0-arity function but doOnCompleted does it with a by-name parameter. I think it's better to stay consistent and just use the same parameter type everywhere. If this is up for debate my preference would be () => Unit over => Unit since it's more explicit IMO.

The inconsitency between subscribe (which takes () => Unit for the onCompleted) and the doOnXxx methods (which take => Unit) is indeed not so nice. And we just had another person being puzzled by this on stackoverflow.

But on the other hand, => Unit allows us to write code like

myObservable.doOnCompleted {
  // one or several lines of instructions...
}

which is a quite common pattern in Scala, whereas with () => Unit, we would have to write

myObservable.doOnCompleted(() => {
  // one or several lines of instructions...
})

which is not very Scala-idiomatic.

The confusion comes from the situation that

myObservable.doOnCompleted(() => {
  // one or several lines of instructions...
})

actually compiles and runs, but never executes the given function. Is there a way to implicitly convert () => {...} into => {...}? If not, is it possible to somehow raise a compiler warning if the 0-arity function is used?

Yes, () => {...}, of type () => Unit, can be turned into => Unit by "value discarding": When Unit is expected, Scala can discard the obtained value and return Unit instead. This feature is quite dangerous, and you can warn about it with -Ywarn-value-discard:

scala -Ywarn-value-discard
scala> def foo(a: => Unit): Unit = a
foo: (a: => Unit)Unit

scala> foo(() => println("hi"))
<console>:9: warning: discarded non-Unit value
              foo(() => println("hi"))
                     ^

I'm the guy who was puzzled from stackoverflow.

Part of the problem may be that there's really not a Scala idiom for function-taking-no-arguments-and-just-causes-side-effects. I think that normally Scala just tries to discourage that.

But here's another option to think about. The cost is another object allocation, and it might require another import to use doOnX, but it allows both kinds of calls:

      trait UnitOrFunction0ReturningUnit {
        def execute(): Unit
      }

      import scala.language.implicitConversions

      object UnitOrFunctionConversions {
        implicit def convertUnit(item: => Unit): UnitOrFunction0ReturningUnit = {
          new UnitOrFunction0ReturningUnit {
            def execute() = item
          }
        }
        implicit def convertFunction(item: () => Unit): UnitOrFunction0ReturningUnit = {
          new UnitOrFunction0ReturningUnit {
            def execute() = item()
          }
        }
      }
      class Sample {
        def doOnSubscribe(fn: UnitOrFunction0ReturningUnit) = {
          fn.execute()
        }
      }
      import UnitOrFunctionConversions._
      val s = new Sample
      s.doOnSubscribe(() => println("==== example 1"))
      s.doOnSubscribe(println(s"==== example 2 ${Random.nextFloat()}"))
      s.doOnSubscribe(println(s"==== example 2 ${Random.nextFloat()}"))

And the printout is:

==== example 1
==== example 2 0.6020808
==== example 2 0.16069102

And you could just add the import to the package, so everyone gets it automatically. Since no one else would be using UnitOrFunction0ReturningUnit, I don't think it would have side effects for other code. (OTOH, it's still early in the morning and I'm not convinced my coffee has kicked in yet...)

Just for the record: One more person got confused about this on stackoverflow (this time it's doOnsubscribe, which also takes a => Unit argument).

Due to project EOL status, this will not be changed