multicast streams can loose their initial start-up events
semmel opened this issue ยท 7 comments
Start-up events of a multicast
-ed stream are swallowed by simultaneous events of derived streams.
Demo
var
M = require('@most/core'),
MS = require('@most/scheduler'),
log = t => M.tap(val => { console.log(t, val); }),
s = M.multicast(M.startWith(1, M.at(2000, 2))),
a = log("a")(s),
b = log("b")(M.startWith(0, s));
Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())]);
// s: 1 --- 2
// a: 1 --- 2
// b: 0 --- 2 !!
// expected b: 0 1 --- 2
When you remove multicast
from s
everything is fine: b: 0 1 --- 2
.
Surprisingly extending a
with startWith
"repairs" things too:
s = M.multicast(M.startWith(1, M.at(2000, 2)));
a = log("a")(M.startWith(0, s));
b = log("b")(M.startWith(0, s));
// s: 1 --- 2
// a: 0 1 --- 2
// b: 0 1 --- 2
Hey @semmel, I'm sorry things aren't totally intuitive around the use of multicast
. It pretty reasonable, as its the only operator in the standard library which makes a stream graph impure (i.e. two subscribers might see different things), and will surface design choices most has made for various reasons.
I'll try to explain using the examples you have provided, but this is "to be expected" when it comes to any push-based reactive abstraction, like most, rxjs, etc. when sharing source streams with multiple subscribers.
In the first example, you have "source" stream M.startWith(1, M.at(2000, 2))
. A stream which should start with a value of 1
and after 2 seconds, emit a value of 2
. This is effectively sugar for the following, and I'll write it like this going forward hopefully for more clarity as I try to pull it apart in more and more detail
const source = M.continueWith(M.now(1), () => M.at(2000, 2))
Let's quickly unpack this. In conjunction with the always-async guarantee in most, M.now(1)
will schedule the emission of 1
effectively using
Promise.resolve(1).then((n) => {
sink.event(scheduler.currentTime(), n);
sink.end(scheduler.currentTime())
})
at which point the continueWith
operator will intercept the first sink.end()
call, to start running M.at(2000, 2)
which will effectively run using
setTimeout(() => {
sink.event(scheduler.currentTime(), 2)
sink.end(scheduler.currentTime()) // the real end of the source stream
}, 2000)
Then s = M.multicast(source)
is utilized to create our stream we can share with multiple subscribers. This is impure because the Stream
that is returned utilizes reference counting to determine when to start running your source
Stream when the reference count goes from 0 to 1, and disposing of any resources when the reference count goes from 1 back to 0.
Knowing all of this, we can get into the meat of this example, written here in "long form"
const log = t => M.tap(val => { console.log(t, val); })
const source = M.continueWith(M.now(1), () => M.at(2000, 2))
const s = M.multicast(source)
const a = log("a")(s)
const b = log("b")(M.continueWith(M.now(0), () => s))
Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())])
Given all that we know about the parts involved now, lets describe the runtime behavior.
1.) a
is subscribed to
2.) a
subscribes to s
3.) The reference count within s
goes from 0 to 1
4.) s
subscribes to source
5.) source
schedules M.now(1)
for emission, which yields to the event loop
6.) b
is subscribed to
7.) b
schedules M.now(0)
for emission, which yields to the event loop
8.) all sync code has run, so the event loop ticks
9.) source
emits 0
through s
into a
, causing a log message
10.) source
schedules M.at(2000, 2)
for emission, which yields to the event loop
11.) b
emits 0, causing a log message
12.) b
subscribes to s
13.) the reference count within s
goes from 1 to 2, keeping the subscription to source
stable
14.) Event loop ticks and 2 seconds pass
15.) source
emits 2
through s
16.) s
emits 2
, in the order of subscription, through a
first, and then through b
second
17.) source
signals to s
its done emitting events
18.) s
signals to a
its done emitting events
19.) the reference count in s
goes from 2 to 1
20.) s
signals to b
its done emitting events
21.) the reference count in s
goes from 1 to 0
22.) cleanup occurs and process ends
Hopefully amongst these details, you'll see that by the time that the stream b
subscribes to the multicast stream s
in step 12 above, the event with the value of 1
has already been emitted through the subscription to s
from stream a
in step 9, leading to the behavior you're witnessing.
Okay, now that we've unpacked all the details of the first example, hopefully its also more clear as to why the second example "repairs" this situation by making a
equivalent to b
.
1.) a
is subscribed to
2.) a
schedules M.now(0)
for emission, which yields to the event loop
3.) b
is subscribed to
4.) b
schedules M.now(0)
for emission, which yields to the event loop
5.) all sync code has run, so the event loop ticks
6.) a
emits 0
, causing a log message
7.) a
subscribes to s
8.) The reference count within s
goes from 0 to 1
9.) s
subscribes to source
10.) source
schedules M.now(1)
for emission, which yields to the event loop
11.) b
emits 0
, causing a log message
12.) b
subscribes to s
13.) the reference count within s
goes from 1 to 2, keeping the subscription to source
stable
14.) all sync code has run, so the event loop ticks
15.) source emits 1
through s
16.) s
emits 1
through a
, logging a message
17.) s
emits 1
through b
, logging a message
18.) source
signal its done emitting events to s
19.) s
signals a
its done emitting events
20.) the reference count within s
goes from 2 to 1
21.) s
signal b
its done emitting events
22.) the reference count within s
goes from 1 to 0
23.) cleanup occurs and process ends
TLDR;
All of this said, it is a fairly common kind of issue in applications that have multiple subscriptions. Dealing with it usually involves replaying some number of past events to "late" subscribers, and there exits an implementation for emitting exactly 1 event here - https://github.com/mostjs/hold
Hopefully that was helpful in understanding the results you are getting, but definitely feel free to ask more questions if anything is unclear
One other piece of advice, not really related to the issue at hand, but on this line in the example
Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())]);
there are 2 separate schedulers being created here. This is definitely not generally recommended. Most's scheduler goes pretty far to ensure the ordering and timing of event as efficiently as possible (both memory and temporaly), and this guarantee is best made when re-using a single scheduler. I usually define an alias something like this
const scheduler = newDefaultScheduler()
const run = <A>(stream: Stream<A>) => runEffects(stream, scheduler)
I'm not positive if it was just a side-effect of making this example on the fly or not, but I figured I'd mention it either way just in case ๐
Hello @TylorS , that is an awesome comprehensive explanation! I still need to invest some quality time to digest your answer.
On the first glimpse I am glad, it's not a bug, but instead it seems I've been sloppy putting too much burden on my stream.
I'll come back here to wrap-up this issue explaining my use-case and how it should be done better.
Also I very much appreciate your advice to use a single scheduler! Indeed this was my carelessness.
Thank again for your swift and valuable response!
If you do have a more concrete use-case, I could likely give better advice on how to handle it. If you have any other questions though, feel free to shoot them my way.
My use-case is:
I combineArray
some data streams to my UI state stream. Which I then consume to render the html on the page (via uhtml โ but that does not matter).
Obviously, before the html can be rendered (i.e. combineArray
produces its first event), every data stream must have produced its first event. Therefore, to get the first page render, I slap in e.g. a startWith("")
or merge(now(""))
to every data stream which ends up in the UI.
In the concrete case, an original data stream had an optional immediate initial event, depending on a value found in the browser's LocalStorage. I did not model that as a Maybe
, but instead (conditionally) merged in a now(localStorageData)
or empty
stream. Thus the derived UI stream could have now two initial events. (Because startWith("")
comes after merge(now(localStorageData))
I figured that it should still work.) To have to examine the sequence of startWith
calls, to find out which one "wins" is already a code smell I guess.
In the end, I'll model that data as Stream Maybe
(a stream carrying a Maybe of the data), which I'll map(getOrElse(""), startWith(nothing()))
. That should get me out of the mess.
Takeaway for me:
- Avoid more than one of
startWith(x)
,merge(now(x))
andcontinueWith(f, now(x))
in a stream. - If that happens, examine your data model.
@semmel It sounds like you came to the same conclusion I would have made. There's a visible temporal effect generated by startWith/continueWith
which can cause missed events. When you're keeping states, using an operator like hold
from @most/hold
is what I'd recommend when sharing, it's identical to multicast (it actually extends it in implementation too), but given it will always hold onto the latest value and replay it to "late" subscriber, it makes me feel safer against those scenarios where otherwise your screen just stays blank or doesn't update as expected
@TylorS Thanks for recommending hold
, I'll use that. (It serves the same purpose like a Property
Stream in baconjs โ I am used to that.)
I came to really appreciate the lazyness (and thus purity) of most streams (e.g. over baconjs EventStream
which always carries some internal state). However, with sharing โ as you said โ the purity goes away. In my case, that was just a consequence of populating my streams with performance-sensitive web api calls. (Obviously I did not want those to execute for every stream subscriber, but just once.)
Having the ability to add state to a stream via hold
(or multicast
) definitely helps, but now I'll have to be careful considering the consequences.