caolan/highland

Stream stuck when used with mongodb and node10

jeanbaptiste-brasselet opened this issue ยท 7 comments

Hello, I really like Highland and I would like to continue to use it but it starts to be a problem with newest version of node.

The following example works just fine with node 8/9 but stays stuck when using node 10+

const MongoClient = require('mongodb').MongoClient;
const hl = require('highland');

const MONGO_URL = 'mongodb://localhost:27017/';

const DB_NAME = 'someDb';
const COLLECTION = 'someCollection';

const run = async () => {
  const mongo = new MongoClient(url);

  await mongo.connect();
  const cursor = mongo.db(DB_NAME)
    .collection(COLLECTION)
    .find()
    .stream();

  return new Promise((resolve, reject) => {
    return hl(cursor)
    .batch(5)
    .flatten()
    .map(x => console.log(x && x._id))
    .stopOnError(reject)
    .done(resolve);
  });
};

run()
  .catch(console.log)
  .then(() => process.exit())

If you replace the mongoDb stream by a simple stream like this one it works just fine:

class Counter extends Readable {
  constructor(opt) {
    super({ ...opt, objectMode: true });
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      this.push({ count: i });
    }
  }
}

Thing is I had a look at the stream implementation inside mongoDB lib and they used the node stream implementation, I have seen nothing weird about it. And something else is a little bit strange, if you remove the flatten inside the hl chain, the stream will not be stucked anymore(even with node10).

Anyway I have no clue what the problem is so if anyone has an idea please share it.

vqvu commented
  1. Indeed it makes no sense here, I was doing some other stuff but I have reduced the problem to this :)
  2. Already using the last 3 beta version
  3. No, the number / size does not matter. I have reproduced the issue on a test database with 10 documents with only the field _id.

About mongo : tried with last mongo driver and several mongo server. (3.0, 3.2, 4.0). I will try with another mongo driver version.

vqvu commented

Ok, I know what's wrong.

There's an awaitDrain variable in the readable state that is incremented whenever the call to write from a pipe returns false. A subsequent drain decrements it, and only when it is 0 will the drain actually take effect. I think it's used to pause the source when at least one pipe destination is not accepting new data. So if awaitDrain ever gets incremented twice in before a drain decrements it, we're in trouble.

Turns out, it's possible for Highland to emit drain from within a write. So a write causes a drain, which causes another write, all synchronously. If the second write returns false and the first returns false, awaitDrain ends up getting incremented twice in a row, since the drain is emitted before either write returns. Hence deadlock.

Here's a simplified test case without mongo.

const {Readable, Writable} = require('stream')
const _ = require('highland');

class Counter extends Readable {
  constructor(opt) {
    super({ ...opt, objectMode: true });
    this._max = 10;
    this._index = 1;
    this._firstTime = false;
  }

  _emitNext() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      this.push({ count: i });
    }
  }

  _read() {
    if (this._firstTime) {
      // Delay the first item so that when it's emitted, Highland isn't in a
      // generator loop. This allows 'drain' to be emitted within write().
      this._firstTime = false;
      setTimeout(() => this._emitNext(), 0);
    } else {
      this._emitNext();
    }
  }
}

async function fn() {
  await _(new Counter())
    .batch(2)
    .consume((err, x, push, next) => {
      // Delay the batch so that the write() -> emit('drain') -> write() stack
      // can unwind, causing multiple increments of awaitDrain.
      setTimeout(() => {
        if (x !== null) {
          next();
        }
        push(err, x);
      }, 0);
    })
    .filter((x) => {
      console.log(x);
      return false;
    })
    .toPromise(Promise);

  console.log('done');
}

fn();

In node 9, this wasn't a problem, since there was a guard that prevented this kind of double increment. Node 10+ contains nodejs/node#18516, which removed the guard in favor of some other logic.

It's not clear to me that the change in Node 10 is absolutely correct, since there can still be a deadlock if write happens to be called within a write. However, it's also clear that Highland's usage of the drain event is definitely incorrect, since it is draining when it hasn't even paused the source. Perhaps the changes to awaitDrain is correct if the drain event is used properly.

The fix is simple enough: just don't drain unless we've previously paused the source. I'll submit it later this week.

Wow, thx for the detailed explanation and for the time spent on this.

vqvu commented

I think I have a fix.

As a sanity check, can you apply this diff to your node_modules/highland/lib/index.js and see if it works?

diff --git a/lib/index.js b/lib/index.js
index 927ae19..3faf0de 100755
--- a/lib/index.js
+++ b/lib/index.js
@@ -834,6 +834,10 @@ function Stream(generator) {
     this._defer_run_generator = false;
     this._run_generator_deferred = false;
 
+    // Signals whether or not a call to write() returned false, and thus we can
+    // drain. This is only relevant for streams constructed with _().
+    this._can_drain = false;
+
     var self = this;
 
     // These are defined here instead of on the prototype
@@ -1258,8 +1262,9 @@ addMethod('resume', function () {
     if (this._generator) {
         this._runGenerator();
     }
-    else {
+    else if (this._can_drain) {
         // perhaps a node stream is being piped in
+        this._can_drain = false;
         this.emit('drain');
     }
 });
@@ -1641,6 +1646,11 @@ addMethod('pull', function (f) {
 addMethod('write', function (x) {
     // console.log(this.id, 'write', x, this.paused);
     this._writeOutgoing(x);
+
+    if (this.paused && !this._generator) {
+        this._can_drain = true;
+    }
+
     return !this.paused;
 });
 
@@ -3412,7 +3422,6 @@ addMethod('through', function (target) {
         return target(this);
     }
     else {
-        target.pause();
         output = this.createChild();
         this.on('error', writeErr);
         target.on('error', writeErr);

You should be able to run patch node_modules/highland/lib/index.js <diff-file>.

I want to verify that there's not a deeper issue that I've missed.

Hello,

I can confirm the changes also fix my use case :)

vqvu commented

This fix has been released as 3.0.0-beta.8 and 2.13.1.