Unintuitive fork behavior
Closed this issue · 4 comments
I have been trying to understand how to split and consume streams and ran into some unintuitive behavior. Can someone tell me why this happens or is this a bug?
If I run the code below, it only prints even numbers
const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
s1.fork().filter(n => n % 2 === 0).each(hi.log);
s1.fork().filter(n => n % 2 !== 0).each(hi.log);
But this code prints all numbers
const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const s2 = s1.fork().filter(n => n % 2 === 0);
const s3 = s1.fork().filter(n => n % 2 !== 0);
s2.each(hi.log);
s3.each(hi.log);
Why should it matter if the forked streams are stored into variables before they are consumed?
I misunderstood the issue. This doesn't address it.
It looks like you need to clarify which stream is logging which value in your example:
const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const s2 = s1.fork().filter(n => n % 2 === 0);
const s3 = s1.fork().filter(n => n % 2 !== 0);
s2.each(x => hi.log('s2', x));
s3.each(x => hi.log('s3', x));
s3 1
s2 2
s3 3
s2 4
s3 5
s2 6
s3 7
s2 8
s3 9
s2 10
It seems to me that this is because, in your first example, the entire stream s1
has been consumed by the first fork
and there are no values left by the time it is forked a second time.
Consider adding asynchronicity:
const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.flatMap(x => hi(push => setImmediate(() => {
push(null, x)
push(null, hi.nil)
})));
s1.fork().filter(n => n % 2 === 0).each(x => hi.log('s2', x))
s1.fork().filter(n => n % 2 !== 0).each(x => hi.log('s3', x))
s3 1
s2 2
s3 3
s2 4
s3 5
s2 6
s3 7
s2 8
s3 9
s2 10
@amsross is right. The consumption methods like each
or toArray
may consume data synchronously if there is already data available. This can lead to unintuitive behavior, as you have discovered.
The best way to deal with this problem is to create all of your forks before applying any transforms to them.
const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const s2 = s1.fork();
const s3 = s1.fork();
s2.filter(n => n % 2 === 0).each(hi.log);
s3.filter(n => n % 2 !== 0).each(hi.log);
Thank you for the explanation!