groupBy function
niecore opened this issue · 8 comments
Is there an alternative to baconjs
's function groupBy
also in kefir available?
If not, would it be accepted as PR?
Could you explain groupBy
's behavior?
groupBy
groups the emitted events in a observable by a key function and returns a stream of streams in which each stream only contains items which have the same key function result.
for reference:
https://baconjs.github.io/api3/classes/observable.html#groupby
https://rxjs.dev/api/operators/groupBy
Interesting. I don't know as we need this for Kefir core (@kefirjs/core happy to disagree if you think this is useful), but one nice thing we added recently is the thru
method, which would allow you to write a groupBy
function that implements this behavior and use it like a method:
import groupBy from 'kefir-groupBy' // would be a package you create
import Kefir from 'kefir'
var events = [
{ id: 1, type: 'add', val: 3 },
{ id: 2, type: 'add', val: -1 },
{ id: 1, type: 'add', val: 2 },
{ id: 2, type: 'cancel' },
{ id: 3, type: 'add', val: 2 },
{ id: 3, type: 'cancel' },
{ id: 1, type: 'add', val: 1 },
{ id: 1, type: 'add', val: 2 },
{ id: 1, type: 'cancel' }
]
function keyF(event) {
return event.id
}
function limitF(groupedStream) {
const cancel = groupedStream.filter(x => x.type === 'cancel').take(1)
var adds = groupedStream.filter(x => x.type === 'add')
return adds.takeUntil(cancel).map(e => e.val)
}
Kefir.sequentially(2, events)
.thru(groupBy(keyF, limitF)) // use it like this
.flatMap(groupedStream => groupedStream.fold(0, (acc, x) => acc + x))
.onValue(sum => {
console.log(sum)
// returns [-1, 2, 8] in an order
})
Great, I'll try it with the thru
method.
Thanks ! :)
If someone is looking for the solution:
const T = require("transducers-js");
const Kefir = require("kefir");
const groupBy = (keyF, limitF) => src => {
const streams = {};
return src.transduce(T.comp(
T.filter((x) => !streams[keyF(x)]),
T.map(function(firstValue) {
const key = keyF(firstValue);
const similarValues = src.changes().filter(x => keyF(x) === key );
const data = Kefir.later(0, firstValue).concat(similarValues);
const limited = limitF(data, firstValue).withHandler((emitter, event) => {
if (event.type === 'end') {
delete streams[key];
emitter.end();
} else {
emitter.emit(event.value);
}
});
streams[key] = limited;
return limited;
})
))};
The only thing I would test there is making sure errors just flow through as expected, but otherwise, looks good!
I am coming back here to ask for your help. The implementation seems to work very much fine a a normal environment but does not work when it's used within jest-kefir
and I have no idea why.
Here is my test usage in a normal environment where everything works fine:
const data = {id: 1, data: 42};
const data2 = {id: 2, data: 43};
const a = stream();
const b = a
.thru(groupBy(a => a.id))
.flatMap(groupedStream => groupedStream.flatMapLatest(x => Kefir.later(0, x)))
.log();
send(a, [value(data), value(data2)]);
Output:
[stream.transduce.flatMap] <value> { id: 1, data: 42 }
[stream.transduce.flatMap] <value> { id: 2, data: 43 }
This is the test which fails because there is no output at all:
const data = {id: 1, data: 42};
const data2 = {id: 2, data: 43};
const a = stream();
const b = a
.thru(groupBy(a => a.id))
.flatMap(groupedStream => groupedStream.flatMapLatest(x => Kefir.later(0, x)));
expect(b).toEmit([value(42), value(43)], () => {
send(a, [value(data), value(data2)]);
});
Maybe you have a hint for, what could be the problem.
The use of Kefir.later
means that the value is being emitted after the test ends. Either use Kefir.constant
, which will emit the value immediately, or use toEmitInTime
and tick
to advance the clock & emit that value.