ReactiveX/RxScala

Observable.map's body will execute for every observer's subscribe.Is that so?

LoranceChen opened this issue · 6 comments

For example:

object ObvMapTest extends App {
  def boo = println("boo - ")

  def foo = println("foo - ")

  val obvTest = Observable[String]{s =>
    s.onNext("hi~")
  }

  val map1 = obvTest.map{o => boo; o}
  val map2 = map1.map{o => foo; o}

  map2.subscribe(s => println(s"observer - $s"))
  map2.subscribe(s => println(s"observer2 - $s"))
  map2.subscribe(s => println(s"observer3 - $s"))
  Thread.currentThread().join()
}

Output

boo - 
foo - 
observer - hi~
boo - 
foo - 
observer2 - hi~
boo - 
foo - 
observer3 - hi~

The result shows the map body's action invoked before emit the event.To be honest, it was not a first aspect for map implement in functional design.
It seems map return a lazy value but evaluate every times.
Shouldn't printing boo - and foo - at the first time when construct map1 and map2?After all, map help us construct a new Observable and we does not care about how it construct when make subscribes. Or its my mislead with Rx map design.
Hope some advice here.

This is by design, the constructing Observables this way will result in a "cold" observable (only doing work when there are subscribers, they setup on every subscriber). To make a hot observable you can either: 1: Create a Subject (which is an Observable as well), and feed values into it, 2: convert a cold observable into hot by .publish() and an immediate .connect(), or ensure at most one instance (will setup on the first subscriber, and teardown if no subscribers left) with .singleInstance()

hi @sorglomer, I have tried publish and .connect combines.It is useful to express once operation. And I considered another way but seems odd.
construct a future to emit subscribe make its escape from multi print.

var read: Observable[String] = {
    val p = Promise[Observable[String]]
    obvTest.subscribe{s => log("foo - ");p.trySuccess(s)}
    Observable.from(p.future).flatten
  }

read construct a new observable form subscribe older one.It‘s completely independence.
But I am worried about it was stupid same as first glance.

If you definitely want to construct a new Observable explicitly, you could just use a Subject:

val read = Subject[String]()
obvTest.subscribe(read)
// now read is the hot version of obvTest

ps: sry for syntax errors, I don't know Scala well

@sorgloomer, I just know little about Subject.Can you give some advice by the simple scala example?
With some struggle, I write a imaginary use case that is use Subject create a hot Observable like the map operator.
val map1 = obvTest.map{o => boo; "foo - " + o}
that changes hi~ Observable to foo - hi~ Observable
I attempt do it with

object SubjectTest extends App {
  val obvTest = Observable[String]{s =>
    s.onNext("hi~")
  }

  val sub = Subject[String]()
  obvTest.subscribe{s => println("foo -" + s); sub}

  sub.subscribe(s => println("first obver - " + s))
  sub.subscribe(s => println("second obver - " + s))

  Thread.currentThread().join()
}

Output:

foo -hi~

subject not emit event to his subscriber, besides, I'm not any idea adds "foo - " prefix to subject.

Use the Observable.subscribe(x:Observer) overload instead of the Observable.subscribe(x:Consumer)

Thanks a lot!