mostjs/core

multicast streams can loose their initial start-up events

semmel opened this issue ยท 7 comments

semmel commented

Start-up events of a multicast-ed stream are swallowed by simultaneous events of derived streams.

Demo

var
  M = require('@most/core'),
  MS = require('@most/scheduler'),
  log = t => M.tap(val => { console.log(t, val); }),

  s = M.multicast(M.startWith(1, M.at(2000, 2))),
  a = log("a")(s),
  b = log("b")(M.startWith(0, s));
Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())]);
// s: 1 --- 2
// a: 1 --- 2
// b: 0 --- 2  !! 
// expected b: 0 1 --- 2 

When you remove multicast from s everything is fine: b: 0 1 --- 2.

Surprisingly extending a with startWith "repairs" things too:

s = M.multicast(M.startWith(1, M.at(2000, 2)));
a = log("a")(M.startWith(0, s));
b = log("b")(M.startWith(0, s));
// s:   1 --- 2
// a: 0 1 --- 2
// b: 0 1 --- 2
TylorS commented

Hey @semmel, I'm sorry things aren't totally intuitive around the use of multicast. It pretty reasonable, as its the only operator in the standard library which makes a stream graph impure (i.e. two subscribers might see different things), and will surface design choices most has made for various reasons.

I'll try to explain using the examples you have provided, but this is "to be expected" when it comes to any push-based reactive abstraction, like most, rxjs, etc. when sharing source streams with multiple subscribers.

In the first example, you have "source" stream M.startWith(1, M.at(2000, 2)). A stream which should start with a value of 1 and after 2 seconds, emit a value of 2. This is effectively sugar for the following, and I'll write it like this going forward hopefully for more clarity as I try to pull it apart in more and more detail

const source = M.continueWith(M.now(1), () => M.at(2000, 2))

Let's quickly unpack this. In conjunction with the always-async guarantee in most, M.now(1) will schedule the emission of 1 effectively using

Promise.resolve(1).then((n) => {
  sink.event(scheduler.currentTime(), n); 
  sink.end(scheduler.currentTime())
})

at which point the continueWith operator will intercept the first sink.end() call, to start running M.at(2000, 2) which will effectively run using

setTimeout(() => {
  sink.event(scheduler.currentTime(), 2)
  sink.end(scheduler.currentTime()) // the real end of the source stream
}, 2000)

Then s = M.multicast(source) is utilized to create our stream we can share with multiple subscribers. This is impure because the Stream that is returned utilizes reference counting to determine when to start running your source Stream when the reference count goes from 0 to 1, and disposing of any resources when the reference count goes from 1 back to 0.

Knowing all of this, we can get into the meat of this example, written here in "long form"

const log = t => M.tap(val => { console.log(t, val); })

const source = M.continueWith(M.now(1), () => M.at(2000, 2))
const s = M.multicast(source)
const a = log("a")(s)
const b = log("b")(M.continueWith(M.now(0), () => s))

Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())])

Given all that we know about the parts involved now, lets describe the runtime behavior.

1.) a is subscribed to
2.) a subscribes to s
3.) The reference count within s goes from 0 to 1
4.) s subscribes to source
5.) source schedules M.now(1) for emission, which yields to the event loop
6.) b is subscribed to
7.) b schedules M.now(0) for emission, which yields to the event loop
8.) all sync code has run, so the event loop ticks
9.) source emits 0 through s into a, causing a log message
10.) source schedules M.at(2000, 2) for emission, which yields to the event loop
11.) b emits 0, causing a log message
12.) bsubscribes to s
13.) the reference count within s goes from 1 to 2, keeping the subscription to source stable
14.) Event loop ticks and 2 seconds pass
15.) source emits 2 through s
16.) s emits 2, in the order of subscription, through a first, and then through b second
17.) source signals to s its done emitting events
18.) s signals to a its done emitting events
19.) the reference count in s goes from 2 to 1
20.) s signals to b its done emitting events
21.) the reference count in s goes from 1 to 0
22.) cleanup occurs and process ends

Hopefully amongst these details, you'll see that by the time that the stream b subscribes to the multicast stream s in step 12 above, the event with the value of 1 has already been emitted through the subscription to s from stream a in step 9, leading to the behavior you're witnessing.

Okay, now that we've unpacked all the details of the first example, hopefully its also more clear as to why the second example "repairs" this situation by making a equivalent to b.

1.) a is subscribed to
2.) a schedules M.now(0) for emission, which yields to the event loop
3.) b is subscribed to
4.) b schedules M.now(0) for emission, which yields to the event loop
5.) all sync code has run, so the event loop ticks
6.) a emits 0, causing a log message
7.) a subscribes to s
8.) The reference count within s goes from 0 to 1
9.) s subscribes to source
10.) source schedules M.now(1) for emission, which yields to the event loop
11.) b emits 0, causing a log message
12.) b subscribes to s
13.) the reference count within s goes from 1 to 2, keeping the subscription to source stable
14.) all sync code has run, so the event loop ticks
15.) source emits 1 through s
16.) s emits 1 through a, logging a message
17.) s emits 1 through b, logging a message
18.) source signal its done emitting events to s
19.) s signals a its done emitting events
20.) the reference count within s goes from 2 to 1
21.) s signal b its done emitting events
22.) the reference count within s goes from 1 to 0
23.) cleanup occurs and process ends

TLDR;

All of this said, it is a fairly common kind of issue in applications that have multiple subscriptions. Dealing with it usually involves replaying some number of past events to "late" subscribers, and there exits an implementation for emitting exactly 1 event here - https://github.com/mostjs/hold

Hopefully that was helpful in understanding the results you are getting, but definitely feel free to ask more questions if anything is unclear

TylorS commented

One other piece of advice, not really related to the issue at hand, but on this line in the example

Promise.all([M.runEffects(a, MS.newDefaultScheduler()), M.runEffects(b, MS.newDefaultScheduler())]);

there are 2 separate schedulers being created here. This is definitely not generally recommended. Most's scheduler goes pretty far to ensure the ordering and timing of event as efficiently as possible (both memory and temporaly), and this guarantee is best made when re-using a single scheduler. I usually define an alias something like this

const scheduler = newDefaultScheduler()
const run = <A>(stream: Stream<A>) => runEffects(stream, scheduler)

I'm not positive if it was just a side-effect of making this example on the fly or not, but I figured I'd mention it either way just in case ๐Ÿ˜„

semmel commented

Hello @TylorS , that is an awesome comprehensive explanation! I still need to invest some quality time to digest your answer.

On the first glimpse I am glad, it's not a bug, but instead it seems I've been sloppy putting too much burden on my stream.
I'll come back here to wrap-up this issue explaining my use-case and how it should be done better.

Also I very much appreciate your advice to use a single scheduler! Indeed this was my carelessness.

Thank again for your swift and valuable response!

TylorS commented

If you do have a more concrete use-case, I could likely give better advice on how to handle it. If you have any other questions though, feel free to shoot them my way.

semmel commented

My use-case is:

I combineArray some data streams to my UI state stream. Which I then consume to render the html on the page (via uhtml โ€” but that does not matter).

Obviously, before the html can be rendered (i.e. combineArray produces its first event), every data stream must have produced its first event. Therefore, to get the first page render, I slap in e.g. a startWith("") or merge(now("")) to every data stream which ends up in the UI.

In the concrete case, an original data stream had an optional immediate initial event, depending on a value found in the browser's LocalStorage. I did not model that as a Maybe, but instead (conditionally) merged in a now(localStorageData) or empty stream. Thus the derived UI stream could have now two initial events. (Because startWith("") comes after merge(now(localStorageData)) I figured that it should still work.) To have to examine the sequence of startWith calls, to find out which one "wins" is already a code smell I guess.

In the end, I'll model that data as Stream Maybe (a stream carrying a Maybe of the data), which I'll map(getOrElse(""), startWith(nothing())). That should get me out of the mess.

Takeaway for me:

  • Avoid more than one of startWith(x), merge(now(x)) and continueWith(f, now(x)) in a stream.
  • If that happens, examine your data model.
TylorS commented

@semmel It sounds like you came to the same conclusion I would have made. There's a visible temporal effect generated by startWith/continueWith which can cause missed events. When you're keeping states, using an operator like hold from @most/hold is what I'd recommend when sharing, it's identical to multicast (it actually extends it in implementation too), but given it will always hold onto the latest value and replay it to "late" subscriber, it makes me feel safer against those scenarios where otherwise your screen just stays blank or doesn't update as expected

semmel commented

@TylorS Thanks for recommending hold, I'll use that. (It serves the same purpose like a Property Stream in baconjs โ€” I am used to that.)

I came to really appreciate the lazyness (and thus purity) of most streams (e.g. over baconjs EventStream which always carries some internal state). However, with sharing โ€” as you said โ€” the purity goes away. In my case, that was just a consequence of populating my streams with performance-sensitive web api calls. (Obviously I did not want those to execute for every stream subscriber, but just once.)

Having the ability to add state to a stream via hold (or multicast) definitely helps, but now I'll have to be careful considering the consequences.