caolan/highland

Unintuitive fork behavior

Closed this issue · 4 comments

I have been trying to understand how to split and consume streams and ran into some unintuitive behavior. Can someone tell me why this happens or is this a bug?

If I run the code below, it only prints even numbers

const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
s1.fork().filter(n => n % 2 === 0).each(hi.log);
s1.fork().filter(n => n % 2 !== 0).each(hi.log);

But this code prints all numbers

const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const s2 = s1.fork().filter(n => n % 2 === 0);
const s3 = s1.fork().filter(n => n % 2 !== 0);
s2.each(hi.log);
s3.each(hi.log);

Why should it matter if the forked streams are stored into variables before they are consumed?

I misunderstood the issue. This doesn't address it.

It looks like you need to clarify which stream is logging which value in your example:

const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const s2 = s1.fork().filter(n => n % 2 === 0);
const s3 = s1.fork().filter(n => n % 2 !== 0);
s2.each(x => hi.log('s2', x));
s3.each(x => hi.log('s3', x));

s3 1
s2 2
s3 3
s2 4
s3 5
s2 6
s3 7
s2 8
s3 9
s2 10

It seems to me that this is because, in your first example, the entire stream s1 has been consumed by the first fork and there are no values left by the time it is forked a second time.

Consider adding asynchronicity:

const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  .flatMap(x => hi(push => setImmediate(() => {
    push(null, x)
    push(null, hi.nil)
  })));
s1.fork().filter(n => n % 2 === 0).each(x => hi.log('s2', x))
s1.fork().filter(n => n % 2 !== 0).each(x => hi.log('s3', x))

s3 1
s2 2
s3 3
s2 4
s3 5
s2 6
s3 7
s2 8
s3 9
s2 10
vqvu commented

@amsross is right. The consumption methods like each or toArray may consume data synchronously if there is already data available. This can lead to unintuitive behavior, as you have discovered.

The best way to deal with this problem is to create all of your forks before applying any transforms to them.

const hi = require('highland');
const s1 = hi([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const s2 = s1.fork();
const s3 = s1.fork();
s2.filter(n => n % 2 === 0).each(hi.log);
s3.filter(n => n % 2 !== 0).each(hi.log);

Thank you for the explanation!