kefirjs/kefir

groupBy function

niecore opened this issue · 8 comments

Is there an alternative to baconjs's function groupBy also in kefir available?
If not, would it be accepted as PR?

Could you explain groupBy's behavior?

groupBy groups the emitted events in a observable by a key function and returns a stream of streams in which each stream only contains items which have the same key function result.

for reference:
https://baconjs.github.io/api3/classes/observable.html#groupby
https://rxjs.dev/api/operators/groupBy

Interesting. I don't know as we need this for Kefir core (@kefirjs/core happy to disagree if you think this is useful), but one nice thing we added recently is the thru method, which would allow you to write a groupBy function that implements this behavior and use it like a method:

import groupBy from 'kefir-groupBy' // would be a package you create
import Kefir from 'kefir'

var events = [
  { id: 1, type: 'add', val: 3 },
  { id: 2, type: 'add', val: -1 },
  { id: 1, type: 'add', val: 2 },
  { id: 2, type: 'cancel' },
  { id: 3, type: 'add', val: 2 },
  { id: 3, type: 'cancel' },
  { id: 1, type: 'add', val: 1 },
  { id: 1, type: 'add', val: 2 },
  { id: 1, type: 'cancel' }
]

function keyF(event) {
  return event.id
}

function limitF(groupedStream) {
  const cancel = groupedStream.filter(x => x.type === 'cancel').take(1)
  var adds = groupedStream.filter(x => x.type === 'add')

  return adds.takeUntil(cancel).map(e => e.val)
}

Kefir.sequentially(2, events)
  .thru(groupBy(keyF, limitF)) // use it like this
  .flatMap(groupedStream => groupedStream.fold(0, (acc, x) => acc + x))
  .onValue(sum => {
    console.log(sum)
    // returns [-1, 2, 8] in an order
  })

Great, I'll try it with the thru method.

Thanks ! :)

If someone is looking for the solution:

const T = require("transducers-js");
const Kefir = require("kefir");

const groupBy = (keyF, limitF) => src => {
    const streams = {};

    return src.transduce(T.comp(
        T.filter((x) => !streams[keyF(x)]),
        T.map(function(firstValue) {
            const key = keyF(firstValue);
            const similarValues = src.changes().filter(x => keyF(x) === key );
            const data  = Kefir.later(0, firstValue).concat(similarValues);

            const limited = limitF(data, firstValue).withHandler((emitter, event) => {

                if (event.type === 'end') {
                    delete streams[key];
                    emitter.end();
                } else {
                    emitter.emit(event.value);
                }
            });

            streams[key] = limited;
            return limited;
        })
))};

The only thing I would test there is making sure errors just flow through as expected, but otherwise, looks good!

I am coming back here to ask for your help. The implementation seems to work very much fine a a normal environment but does not work when it's used within jest-kefir and I have no idea why.

Here is my test usage in a normal environment where everything works fine:

const data = {id: 1, data: 42};
const data2 = {id: 2, data: 43};

const a = stream();

const b = a
    .thru(groupBy(a => a.id))
    .flatMap(groupedStream => groupedStream.flatMapLatest(x => Kefir.later(0, x)))
    .log();

send(a, [value(data), value(data2)]);

Output:

[stream.transduce.flatMap] <value> { id: 1, data: 42 }
[stream.transduce.flatMap] <value> { id: 2, data: 43 }

This is the test which fails because there is no output at all:

const data = {id: 1, data: 42};
const data2 = {id: 2, data: 43};

const a = stream();

const b = a
        .thru(groupBy(a => a.id))
        .flatMap(groupedStream => groupedStream.flatMapLatest(x => Kefir.later(0, x)));

expect(b).toEmit([value(42), value(43)], () => {
    send(a, [value(data), value(data2)]);
});

Maybe you have a hint for, what could be the problem.

The use of Kefir.later means that the value is being emitted after the test ends. Either use Kefir.constant, which will emit the value immediately, or use toEmitInTime and tick to advance the clock & emit that value.