atomrc/callbag-debounce

Improvement: make it work for both listenable and iterable sources

Opened this issue · 9 comments

I think this operator should behave like RxJs debounceTime. Right now it almost behaves like that for listenable sources, but it doesn't for iterables.
For example, this is the current behaviour for iterables:

pipe(
  fromIter([1,2,3]),
  debounce(1000),
  forEach(console.log) // emits all values one by one every second.
)

It should ignore all previous values that were emitted within the delay range, so it should emit only the number 3 after 1 second.

We can achieve that but changing your code to something like:

const debounceTime = wait => source => (start, sink) => {
  let id, ask;
  start === 0 && source(start, (t, d) => {
    if (start === t) ask = d;
    if (t === 1) {
      clearTimeout(id);
      id = setTimeout(() => {
        sink(t, d);
        id = undefined;
      }, wait);
      if (id) ask(t);
      return;
    }
    sink(t, d);
  });
};

What do you think?

Hi,

Thanks for reporting this. I'll try to implement this soon then :)

I've checked how other libraries handle this:
Rx
Rx emits last value and completion "immediately". Nothing is debounced, completion flushes the last value.

console.log('start', Date.now())

Rx.Observable.of(1, 2, 3, 4, 5).debounceTime(50).subscribe({
  next(val) {
    console.log(Date.now(), 'next', val)
  },
  complete(val) {
    console.log(Date.now(), 'complete')
  },
})

xstream
xstream emits only the completion - also nothing is debounced here.

var debounce = require('xstream/extra/debounce').default

console.log('start', Date.now())

require('xstream').default.of(1, 2, 3, 4, 5)
	.compose(debounce(50))
	.addListener({
		next(val) {
			console.log(Date.now(), 'next', val)
		}, 
		complete() {
			console.log(Date.now(), 'complete')
		},
	})

@Andarist I'm surprised because I'm almost sure that I tested that with RxJs :D and it behaved as I described, but after reading your comment I checked it and you're right: it emits the last value and completes immediately without debouncing.

Hey @franciscotln @Andarist
Sorry I haven't been really active lately. I am in between two jobs - two cities - two countries.
Loads of changes for me and I didn't really had the time to take care of my open source project.

So regarding the pullable sources support question. I was starting reviewing your PR @Andarist and I wondered: Is there a real life scenario for debouncing a pullable source?
I cannot seem to find a valid scenario thus I am not sure we should support it.
What do you think. Do you have a use case in mind?

From the definition we can read on callbag-basics github "Imagine a hybrid between an Observable and an (Async)Iterable, that's what callbags are all about." so iterables can also be async (generator) and I believe it's better to have all operators supporting both listenable/pullable sources. I can't think of an example right now but for me it just feels right to support both, behaving for example as

Rx.Observable.from([1,2,3]).debounceTime(1000) // => 3|

I just finished re-writing callbag-delay because the one that is published only supports listenable sources and don't even support sink completion.

Personally I don't have a use cose for this, I was just playing with callbags & "fixed" this issue when looking at this repo. I believe though that asyncIterables might benefit from that (as mentioned by @franciscotln).

I also try to support both types in my published operators, although it's not always possible (to my understanding).

Sorry I haven't been really active lately. I am in between two jobs - two cities - two countries.
Loads of changes for me and I didn't really had the time to take care of my open source project.

No problem. This can wait, take care of yourself 👍

[EDIT] Ok while writting my reply, I realized this could be a valid scenario.
Even though, I post it anyway just for the record :)


I don't think it's a good idea to compare Rx's from with callbag's iterable sources.

The thing is, with Rx it is still Reactive Programming (meaning the source decides when to emit).

In this case, it's the consumer that decides when to pull a value.
That's a complete different aspect IMHO.
The consumer is responsible for choosing when to pull a value, not the source.

In this case we would have a pretty hard-to-understand workflow where the consumer wants a value but get it a few ms latter or eventually gets another value if it asked for another one.

There was a lot of discussions about this in the most world, this issue for example. You may want to lean on it.

Hi @Sinewyk

Thanks for pointing this out. Interesting discussion indeed, but I don't really think it actually relates to the discussion here.

The problem here is that we have pull streams (meaning the consumer can actually decide when it wants to get a value from a source).
Which is something that doesn't exists in Reactive Programming (if I am not mistaken).
Thus, this is not a behaviour that exists neither in Rx nor most, so we cannot really inspire from those.

The question is, if a consumer requests a value, is it expected that the value is debounced or is it supposed to be sent right away?
I don't really have the answer to that question, and I cannot seem to find a valid scenario for actually debouncing the requested value.

In my opinion, when the consumer requests the value it's actually its job to not request it at a reasonable rate. The responsibility shifts from the source to the consumer, but that's open for debate.
So far, I don't really see any strong argument for or against this behaviour...