esamattis/node-promisepipe

Possible problem with Duplex streams?

jballant opened this issue · 4 comments

Thanks so much for this module--I think it is really handy for stream pipelines.

However, I think there might be a problem (that I've experienced) with how duplex streams are handled.

Duplex streams emit both "end" and "finish" events, which means that a duplex stream isn't really "done" until both events are fired.

Right now, this module resolves the stream when either an "end" or a "finish" event fires, like this:

stream.on("end", resolve);
stream.on("close", resolve);
stream.on("finish", resolve);

However, because a duplex is both "writable" and "readable", the "finish" event might be fired, resolving the promise, before the readable side of the stream ends. If an error happens between the "finish" and the "end" events, the promise for that stream will not be rejected. It has already been resolved--a call to "reject" does not change the state.

I think the fix would be to check if the stream is an instance of "Duplex" and then wait for both events to be fired before resolving the promise.

Something like this maybe?

var readableEnded = false;
var writableFinished = false;
var streamIsDuplex = stream instanceof Duplex;

function resolver (result) {
     // If the stream is a duplex, we wait until both "end" and
    // "finish" are fired. Otherwise, we can go ahead and resolve.
     if (!streamIsDuplex || (readableEnded && writableFinished)) {
          resolve(result);
     }
}

// error event handler rejects promise

stream.on("end", function (result) {
   readableEnded = true;
   resolver(result);
});
stream.on("finish", function (result) {
   writableFinished = true;
   resolver(result);
});
stream.on("close", resolve);

Once again, thanks for making this module.

const duplexEvents = ['finish', 'end'].map(on);
const duplexResolved = streamIsDuplex ? Promise.all(duplexEvents) : Promise.race(duplexEvents);
Promise.race(['error', 'close'].map(on).concat(duplexResolved));

Just put dev-null[0] stream at the end of your list of streams, instead of the duplex/transform stream.

[0] https://www.npmjs.com/package/dev-null

I experienced similar problems as well:
My data got randomly corrupted. Here and there some JSON structure was mixed up. Seems to depend on the speed of the different streams.

It seems to be related to S3 and/or dynamo-backup-streams as their functions return Duplex streams:

var pass = new stream.PassThrough(); var params = { Bucket: bucket, Key: key, Body: pass }; s3.upload(params); return pass;
s3.getObject(params).createReadStream();

I than restore a backup to DynamoDB via https://github.com/SaltwaterC/dynamo-backup-streams, which restore function is Duplex as well.

Would love to use pipes with promises. Maybe this gets fixed.
Thank you!

Should be fixed in v2.1.2 (see #14).