brianc/node-pg-copy-streams

CopyFail is never sent to DB

Opened this issue · 21 comments

#74 mentions that in the event of an error in your stream, you can close the client. However, often times you want to continue using the connection and instead simply rollback the COPY statement. My understanding is that this can be done for copy-in streams by sending CopyFail to Postgres:

The frontend can terminate the copy-in mode by sending either a CopyDone message (allowing successful termination) or a CopyFail message (which will cause the COPY SQL statement to fail with an error). The backend then reverts to the command-processing mode it was in before the COPY started, which will be either simple or extended query protocol. It will next send either CommandComplete (if successful) or ErrorResponse (if not).

https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY

However, it doesn't appear that node-pg-copy-streams ever sends CopyFail to Postgres. Instead, the COPY query is left running waiting for more input that will never come.

It seems like maybe the copy-from stream should implement _final and send CopyFail or CopyDone. Barring that, is there a way to terminate a query without releasing the client?

From what I can tell, the copy-in PG stream is closed in this _final handler by sending CopyDone:

_final(cb) {
this.cb_ReadyForQuery = cb
const self = this
const done = function () {
const Int32Len = 4
const finBuffer = Buffer.from([code.CopyDone, 0, 0, 0, Int32Len])
self.connection.stream.write(finBuffer)
}
if (this._gotCopyInResponse) {
return this.flush(done)
}
this.cb = done
}

However, my understanding is that _final won't be called in the event of an error.

For my own use case, I was able to work around this with the following:

const copyStream = require('pg-copy-streams').from(sql);
copyStream._destroy = function (err, callback) {
  this._final(() => callback(err));
};

This will complete the copy-in with a CopyDone, which is fine in my case since I will rollback the enclosing transaction anyway. But you'd probably want to send CopyFail if you're in autocommit mode.

Hello,
sorry for the delay and thanks for your work and your thoughts on this.

We should try and see in the future how best to implement / test / document how to bail out of the COPY operation with a copyFail.

As a workaround, (it was not designed for this module), it could maybe be sufficient to call sendCopyFail(msg) on the postgres connection (not tested). This would send the copyFail message to postgres, which would then confirm the bailout of the operation to the client, which would hopefully leave a clean state on the client.

Don't hesitate to tell me if you have time to test this.

I think we're running into this issue as well. In our case, we want to reuse the same client retrieved from a pool, then loop through a set of files in AWS S3 and pipe a read stream to directly load into pg. This works wonderfully 99% of the time, but sometimes the AWS SDK throws an error and times out the read stream. We can catch the error and log it via the stream.pipeline() we've done between these streams, but the pg client stays open and shows up as an active query that we have to manually terminate.

Again, this doesn't happen very often from S3 fortunately, but I will try out the code posted above to see if it resolves our issue.

I would be interested in seeing your code for what you're doing to see how we could transform it into a reproducible test and then handle it in the library

implementing _destroy() like in gabegorelick's comment (but using it to send copyFail) could be the solution but I suspect there are other error modes where we would want to send copyFail that would not be handled by this.

Otherwise, depending on what is possible, calling this.connection.sendCopyFail("Error causing the failure") should work.

I will try and reproduce the error by building a filestream and force-throwing to verify later. For now, I can verify the above code doesn't just resolve it. In fact, it just caused all loading to stop, lol. Here's an overview of our loader in a set of abstractions to see the general flow. We will either open a file stream or an S3 stream. This code works with either. If you're wondering what AsyncPipeline is, it's a very thin wrapper over util.promisify(stream.pipeline)

const { Transform } = require('stream');
const copyFrom = require('pg-copy-streams').from;
const AsyncPipeline = require('./AsyncPipeline')
const StreamValues = require('stream-json/streamers/StreamValues');
const Db = require('./Db');
const { getCopyRowFromObject } = require('./Db')
const zlib = require('zlib')

function getCopyStream(sql) {
    // following _destroy implementation is an attempt to clean up the connection
    // after an error occurs in a pipeline, such as AWS S3 download stream throttling
    // From: https://github.com/brianc/node-pg-copy-streams/issues/115#issuecomment-670293136
    const copyStream = copyFrom(sql)

    copyStream._destroy = function (err, callback) {
        if(err) {
            this.connection.sendCopyFail(err.message)
        }
        callback()
    }

    return copyStream
}

class Loader {
    constructor(stream, authenticator, options = {}) {
        this.stream = stream
        this.db = new Db(authenticator, options)
        this.options = options
    }

    copyTransformer(props, options = {}) {

        let accessor = options.accessor || (data => data)
        let map = options.map || (data => data)
        let filter = options.filter || (() => true)

        let copyRow = obj => getCopyRowFromObject(obj, props)
        
        return new Transform({
            objectMode: true,
            transform: function(chunk, encoding, callback) {
                chunk = accessor(chunk)

                try {
                    if(filter(chunk)){
                        let mapped = map(chunk)
    
                        if(mapped) {
                            const row = copyRow(mapped)
                            this.push(row)
                        }
                    }

                    callback()

                } catch(err) {
                    callback(err)
                }
            }
        })
    }

    async load(table, options = {}) {
    
        await this.db.createSchemaConcurrently()
        await this.db.createTableConcurrently(table)

        // stream-json produces { key: <index> , value: <object> }
        options.accessor = data => data.value

        const steps = [ this.stream ]

        if(this.options.gunzip) {
            steps.push(zlib.createGunzip())
        }

        steps.push(StreamValues.withParser())
        steps.push(this.copyTransformer(this.db.fieldNames[table], options))
        steps.push(this.db.stream(getCopyStream(this.db.getModelCopyStatement(table))))

        const pipeline = new AsyncPipeline(steps)

        await pipeline.run()
    }

}

module.exports = Loader

Ok I understand better. It seems the use of pipeline automates the forwarding of errors and the call to destroy on each stream in the pipeline

doc

stream.pipeline() will call stream.destroy(err) on all streams except:

Readable streams which have emitted 'end' or 'close'.
Writable streams which have emitted 'finish' or 'close'.

which seem to makes it possible in this specific case to automate the copyFail bailout via destroy.

In other cases for those not using pipeline, they would have to call destroy themselves.

Error modes in streams are always something not easy to implement so we need tests for both cases.

I need to find a clear explanation on how _destroy and _final may or may not both be called.

your code (just sending copyFail) should work I suppose but it could leave dangling chunks in memory (those are flushed in _final before sending copyDone). Anyway upon an error, the doc states it is probably better to discard the stream and not reuse it.

I think that from https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L716 we can make the hypothesis that once destroy has been called, _final will not be called.

from https://github.com/nodejs/node/blob/master/lib/internal/streams/destroy.js#L109 it seems that _destroy can now return a promise so that the destroying mechanism can be async before the internal engines sends 'close' or 'error'. We might need this or some other mechanism because in the _final case for example, there is a waiting mechanism that waits for postgres to acknowledge the copyDone before sending 'finish' (otherwise, people think that the COPY is finished, by postgres is still ingesting the data)

And it needs to make sense with https://www.postgresql.org/docs/10/protocol-flow.html

Copy-in mode (data transfer to the server) is initiated when the backend executes a COPY FROM STDIN SQL statement. The backend sends a CopyInResponse message to the frontend. The frontend should then send zero or more CopyData messages, forming a stream of input data. (The message boundaries are not required to have anything to do with row boundaries, although that is often a reasonable choice.) The frontend can terminate the copy-in mode by sending either a CopyDone message (allowing successful termination) or a CopyFail message (which will cause the COPY SQL statement to fail with an error). The backend then reverts to the command-processing mode it was in before the COPY started, which will be either simple or extended query protocol. It will next send either CommandComplete (if successful) or ErrorResponse (if not).

In the event of a backend-detected error during copy-in mode (including receipt of a CopyFail message), the backend will issue an ErrorResponse message. If the COPY command was issued via an extended-query message, the backend will now discard frontend messages until a Sync message is received, then it will issue ReadyForQuery and return to normal processing. If the COPY command was issued in a simple Query message, the rest of that message is discarded and ReadyForQuery is issued. In either case, any subsequent CopyData, CopyDone, or CopyFail messages issued by the frontend will simply be dropped.

So when in _destroy we send copyFail, we need to wait for pg to intercept the error. It will then call this.handleError which then need to call the callback received via _destroy or something along those lines. This could work but we need to clarify if pg is going to send the COPY command via and extended-query message or not to see if we need to send a Sync message if pg is going to do it by itself. Otherwise the postgres connection could maybe be in some unfinished state which could be a problem if it is release to a pool.

But the first step is to have simple tests

I looked a bit into this. I found a slight interaction issue between node-pg-copy-stream and pg.

When _destroy sends copyFail, postgres receives it and returns an ErrorResponse message that is catched by pg which calls the handleError hook. This is good to know that postgres has errored.

BUT inside pg it is then decided that the activeQuery is no longer active here
This blocks the call to handleReadyForQuery because of this code

So we are not informed of when the connection is really available for new queries.

According to the documentation of postgres

ErrorResponse
An error has occurred.

ReadyForQuery
Processing of the query string is complete. A separate message is sent to indicate this because the query string might contain multiple SQL commands. (CommandComplete marks the end of processing one SQL command, not the whole string.) ReadyForQuery will always be sent, whether processing terminates successfully or with an error.

so in theory it should be possible inside pg to keep the query active until ReadyForQuery is received by removing the this.activeQuery = null line

@brianc can I ask you what you think about this ? could we consider that since the postgres documentation states that ReadyForQuery is always sent (success and error), only ReadyForQuery should be setting the activeQuery to null when postgres sends ReadyForQuery ?
I am not totally familiar with what happens with this.activeQuery = null in _errorAllQueries and _pulseQueryQueue.
If you prefer I can open an issue on pg

I ask you what you think about this ? could we consider that since the postgres documentation states that ReadyForQuery is always sent (success and error), only ReadyForQuery should be setting the activeQuery to null when postgres sends ReadyForQuery ?

that might be the case yeah - there've been a smattering of bugs throughout the years related to error happening and then ready for query and the active query already being null. A few even caused by slightly different implementations of the postgres protocol in other databases...so it's a bit tricky to change that code but if we can do it w/ all tests passing it might be worth it.

thanks for your reply.
I looked a bit at the code and tried running the pg tests without the offending line but the result is not yet satisfying.
It does not seem easy to change this behavior because as I understand it, in pg the semantics of an error can come from 2 things :

  • an ErrorResponse coming from postgres (that would hopefully always be followed by a ReadyForQuery if the connection to the protocol layer is not lost and if the postgres documentation of the protocol is 100% true on this)
  • a loss of the protocol layer (backend termination, socket termination, ..)

for example, the test break on https://github.com/brianc/node-postgres/blob/master/packages/pg/test/integration/client/query-error-handling-prepared-statement-tests.js#L85 where the test expects 'terminating connection due to administrator command' but we now get 'Connection terminated unexpectedly'

So it may not be reasonable for a query to always only wait for ReadyForQuery before ackowledging that an ErrorResponse was received.

ReadyForQuery is sort of in-between : it does not fully belong to the prev query and it does not fully belong to the next query either. it is a sort of grey area.

I'll try to dig a bit deeper into this handling inside pg (I'll open an issue on pg if/when it is clearer for me) but i will also try and see if the ErrorResponse could be sufficient to solve the current issue.

@brianc I digged a little further into the error handling mechanism.
Currently a query can be errored only once because of the activeQuery = null in handleErrorMessage. If we remove that there are cases where query.handleError can be called multiple times (ErrorResponse followed by a loss of connection before ReadyForQuery is received). We could I think correct this behavior by keeping track of a query.errored bool that could be used to avoid calling query.handleError multiple times. Tell me what you think and if you want me to try to send a PR on this.

In any case this need a new pg version to activate the new behavior (handleReadyForQuery called even when an error happened) so I think I am going to try and find a way to fix the behavior in pg-copy-stream by listening to con('ReadyForQuery') directly.

@gabegorelick , @timgit

I just published version 6.0.0 of the module that implements a _destroy version that will send the CopyFail message to the backend when called (automatically via pipeline or via a call to destroy()).

I am interested in your feedback if you can test this version in your setups.

I just tested 6.0.0, I don't think it's working (without throwing an error). pipelines that end with copyFrom are now hanging after they're complete.

await pipeline(
  ...,
   db.query(copyFrom(`COPY ... FROM STDIN WITH (FORMAT csv)`))
)
console.log('will never be called')

5.1.1 is working perfectly.

@willfarrell thanks for the feedback. Do you have simple test that shows this behavior ? All tests are passing on my side and after looking rapidly I cannot find a way to reproduce this. Can you tell me what node version you are using ?

I'm using node v14

I've been thinking on this, it may have been due to other processes. I'll do some testing an create a new issue if it actually an issue.

@willfarrell there was an issue related to the precedence of _destroy vs _final that was modified in node 14. There is an additional test and a fix in version 6.0.1 (and node 14 is now tested in travis).

If you have time to test this version and see if it fixes your issue it would be helpful. Thanks.

Thanks for taking a deeper look. That seems to have done the trick, working as expected now. Is it worth adding node v16 to the ci list, just in case?

I will add v16 but it seems in v16 something else changed around memory management as it breaks a "performance" test on my machine - https://github.com/brianc/node-pg-copy-streams/blob/master/test/binary.js#L55 - I may need to remove this test or accept a higher memory consumption in the test.

or take time to understand what happens in node v16.. I created issue #124 in order to avoid polluting the current issue.

I will leave this issue open for a few weeks in case some of you have time to test 6.0.1 and confirm if it behaves correctly regarding the rollback / CopyFail mechanism.

This version is working on our setup using Node 14. I will report back if we see this issue again. Thanks