brianc/node-pg-copy-streams

COPY/FROM does not fire 'drain' event

dehmer opened this issue · 5 comments

I slightly changed test/copy-from/testConstruction to wait for drain event to be fired after high water mark was reached. But it seems drain is not fired. I have an issue in a more complicated setup and this might be related to it.

Can you verify this?
Or is it save to ignore stream write operations returning false?

var testConstruction = function() {
  var highWaterMark = 10
  var stream = copy('COPY numbers FROM STDIN', {highWaterMark: 10, objectMode: true})
  for(var i = 0; i < highWaterMark * 1.5; i++) {
    stream.write('1\t2\n')
  }

  if(stream.write('1\t2\n')) assert.fail('Should correctly set highWaterMark.')
  else {
    // `drain` event is never fired. Bug?
    stream.on('drain', () => console.log('drained'))
    setTimeout(() => { console.log(`time's up.`)}, 5000)
  }
}

Hello,

thanks for the report. the copy stream is not supposed to be used in objectMode so I have no idea if this can be related. It would be better to test the highWaterMark support with a non-objectMode mechanism.

Internally, a Transform stream is created with the options you pass, and this Transform stream is piped into the pg connection stream which, as far as i know, is not in objectMode.

Note that each call to _transform leads to 2 calls to push.

There are currently no tests related to backpressure and I have to confess that I don't know the internals of pg well enough to tell you if backpressure can be relied upon. Nevertheless until now I have had no problem with huge files and this is something that would be interesting to prove if you have a pattern to test how well backpressure is handled in stream pipelines.

Calling write is ok even when it returns false. It will buffer chunks in memory though if the downstream streams are not acting fast enough.

I hope this helps.

Thanks for the detailed reply!
My understanding is, that back-pressure handling is independent of whether streams are used in object mode or not.

So far I had no problems with huge loads either and I'm not sure if my issue is related at all.
Currently I'm upgrading to 2.2.0 and I will investigate further. Sadly I have to use write() and cannot pipe to COPY/FROM because I'm splitting a binary input stream into roughly 300 COPY streams (one for each entity).
I think this issue can be closed for now until I have additional information.

In any case I will keep in mind, that slow connections/databases might pose some stress on memory consumption.

@dehmer how did you solve your problem? have the same - i'm trying to write in parallel to multiple copy streams, but with no luck :(
--- updated
seems like it was because of begin query

@silentroach I tried to extract the code which splits one TSV input stream into multiple COPY streams. Code contains some pseudo stuff and does not work as is. But hopefully, the idea comes through.

const { Writable } = require('stream')
const copy = require('pg-copy-streams')
const client = {/* PSEUDO CODE: client checked out from PG pool. */}

// NOTE: transaction BEGIN/COMMIT are handled outside stream pipeline

// PSEUDO CODE: backend not used in following code.
const load = (/* backend */) => new Writable({
  objectMode: true,

  // Chunk from upstream: TSV record for given entity:
  write(chunk, _, next) {
    const createStream = entityId => {
      // PSEUDO CODE: tableName/columns are derrived from entityId
      const out = client.query(copy.from(`COPY ${tableName} (${columns}) FROM STDIN`))

      out.once('error', err => {
        // Don't call callback (again), because of async nature of out stream.
        this.emit('error', err)
      })

      return out
    }

    const {entityId, record} = chunk

    // Create/open new stream for given entity on demand.
    this.streams = this.streams || {} 
    this.streams[entityId] = this.streams[entityId] || createStream(entityId)

    const stream = this.streams[entityId]
    stream.write(Buffer.from(record + '\n'))
    process.nextTick(next)
  },

  async final(callback) {
    Object.values(this.streams).forEach(out => out.end())
    this.streams = {}

    const entityIds = Object.keys(this.streams)
    // Post-process written entities after write...
  }
})

@dehmer thank you