Observable.map's body will execute for every observer's subscribe.Is that so?
LoranceChen opened this issue · 6 comments
For example:
object ObvMapTest extends App {
def boo = println("boo - ")
def foo = println("foo - ")
val obvTest = Observable[String]{s =>
s.onNext("hi~")
}
val map1 = obvTest.map{o => boo; o}
val map2 = map1.map{o => foo; o}
map2.subscribe(s => println(s"observer - $s"))
map2.subscribe(s => println(s"observer2 - $s"))
map2.subscribe(s => println(s"observer3 - $s"))
Thread.currentThread().join()
}
Output
boo -
foo -
observer - hi~
boo -
foo -
observer2 - hi~
boo -
foo -
observer3 - hi~
The result shows the map
body's action invoked before emit the event.To be honest, it was not a first aspect for map implement in functional design.
It seems map return a lazy value but evaluate every times.
Shouldn't printing boo -
and foo -
at the first time when construct map1
and map2
?After all, map help us construct a new Observable and we does not care about how it construct when make subscribes. Or its my mislead with Rx map design.
Hope some advice here.
This is by design, the constructing Observables this way will result in a "cold" observable (only doing work when there are subscribers, they setup on every subscriber). To make a hot observable you can either: 1: Create a Subject
(which is an Observable as well), and feed values into it, 2: convert a cold observable into hot by .publish()
and an immediate .connect()
, or ensure at most one instance (will setup on the first subscriber, and teardown if no subscribers left) with .singleInstance()
hi @sorglomer, I have tried publish
and .connect
combines.It is useful to express once operation. And I considered another way but seems odd.
construct a future to emit subscribe make its escape from multi print.
var read: Observable[String] = {
val p = Promise[Observable[String]]
obvTest.subscribe{s => log("foo - ");p.trySuccess(s)}
Observable.from(p.future).flatten
}
read
construct a new observable form subscribe older one.It‘s completely independence.
But I am worried about it was stupid same as first glance.
If you definitely want to construct a new Observable explicitly, you could just use a Subject:
val read = Subject[String]()
obvTest.subscribe(read)
// now read is the hot version of obvTest
ps: sry for syntax errors, I don't know Scala well
@sorgloomer, I just know little about Subject.Can you give some advice by the simple scala example?
With some struggle, I write a imaginary use case that is use Subject create a hot Observable like the map operator.
val map1 = obvTest.map{o => boo; "foo - " + o}
that changes hi~
Observable to foo - hi~
Observable
I attempt do it with
object SubjectTest extends App {
val obvTest = Observable[String]{s =>
s.onNext("hi~")
}
val sub = Subject[String]()
obvTest.subscribe{s => println("foo -" + s); sub}
sub.subscribe(s => println("first obver - " + s))
sub.subscribe(s => println("second obver - " + s))
Thread.currentThread().join()
}
Output:
foo -hi~
subject not emit event to his subscriber, besides, I'm not any idea adds "foo - " prefix to subject.
Use the Observable.subscribe(x:Observer)
overload instead of the Observable.subscribe(x:Consumer)
Thanks a lot!