brianc/node-pg-copy-streams

copy-from fails when large csv file(200K+ user rows) is copied from the file to the DB

Haroon-jay opened this issue · 17 comments

Hi! we are using this library to copy data from large csv files to our PostgreSQL database, it works fine on smaller files(less than 100K user rows) but it fails on larger files crashing the server with this error
Capture

I am running an express server and using multer for file upload if that helps, the code that I am using to copy from the file

const dbStream = client.query(
        from(`COPY raw(email,first,last,address_1,address_2,city,state,country,phone,gender,any_1,list_id) FROM STDIN WITH(FORMAT CSV)`)
      );
      fileStream.pipe(dbStream);

fileStream is the read stream for the uploaded csv file
Thank you!

Bump.
I have just ran into an identical issue using architecture as follows:

image

It fails with the same error as in OP's case when there are more than ~100k rows.

Hello,
thanks for the report.

could one of you manage to write a simple test that reproduces the issue that we could include in the test suite ?
the test would need to build the fixture because I don't want to commit a large file in the repo.

hopefully each one of us will then be able to reproduce the issue extracted from a specific pipeline and it will be easier then to track the problem.

The error seems to happen on

copy-from.js L 43 : this.chunks.push(...chunks)

can you share how many "chunks" are pushed when the error happens ?

maybe https://stackoverflow.com/questions/20936486/node-js-maximum-call-stack-size-exceeded can help in tracking the root cause.

For information I could reproduce the error by modifying the "correctly handle more heavy scenario" test in https://github.com/brianc/node-pg-copy-streams/blob/master/test/copy-from.js#L105.

the test passes with 120K rows but does not pass 130K with rows.

maybe this can help find the problem.

The problem seem to be related to cases when a lot of data is buffered before beeing sent as a big chunk (I suspect that happens before the database has acknowledged that the COPY operation can begin).

we use this.chunks.push(...chunks) but there is a limitation on the number of chunks that can be pushed on an array with this method (it corresponds to the max number of arguments of a function in javascript)

cf https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function/apply

But beware: by using apply this way, you run the risk of exceeding the JavaScript engine's argument length limit. The consequences of applying a function with too many arguments (that is, more than tens of thousands of arguments) varies across engines. (The JavaScriptCore engine has hard-coded [argument limit of 65536](https://bugs.webkit.org/show_bug.cgi?id=80797).)

so on your side you can try to understand why the pipeline builds up a large number of chunks before emitting them to the database. Does it happen more or less at the start of your execution of the pipeline, or can it happen at anytime ?

regarding this library, it is necessary to change the way the chunks are buffered to handle the case when there is a very large number of outstanding chunks. Either we can use concat but this makes copies of arrays or we can split the push into several calls to push

I am sorry @jeromew but I do not feel mature enough in terms of my code skills to help you with this problem.

Thanks for explaining the problem however. Thanks to your explanation and after rethinking architecture of my current process I came up with an idea to spawn instances of streams for each S3 key instead of aggregating everything to one stream.

image

This way database will receive more "stream sessions" with each containing less rows. I will test that tomorrow.

Hi @jeromew! Thank you for the response!
The error happens more at the start, before insertion like you said,
How do you recommend I try to understand why the pipeline builds a large number of chunks with my setup?

Thank you!

And regarding your question why the pipeline builds up a large number of chunks - I have a high delay between me and the database

  • the database is far away from me (Europe - USA),
  • connection is port-forwarded

I think this might result in a situation where my node client starts adding things rapidly and it overflows before it even reaches the database due to a high delay

@ablaszkiewicz I published a new version 6.0.3 that should fix the RangeError issue that you encountered.

This should avoid the error when the pipeline has a very large number of chunks to deliver to the database, which I suspect happens mostly when the source pipeline starts very rapidly to gather chunks while postgres is still in the process of allowing the COPY operation.

I suppose you can keep your current implementation with several S3 sources but you should probably look at the memory profile. I do not have time to check right now ; I suppose that the number of chunks cannot explode and would be limited by the stream HighWaterMark.

@Haroon-jay can you test version 6.0.3 on your side also ?

@jeromew I just tested it out, It has drastically increased the time taken to insert a file, I was able to insert a file with 100K rows within 3s in the previous version, this is how much time it took this time 244632ms :)
I did not change anything on my setup Btw.

@Haroon-jay I have a hard time understanding how the modification could make the timing for 100K rows go from 3s to 244s ; can you confirm that by swapping 6.0.3 with 6.0.2 several times you still have the same results (6.0.2: 3sec, 6.0.3: 244sec)

what happens if with 6.0.3 you modify the line https://github.com/brianc/node-pg-copy-streams/blob/master/copy-from.js#L46 to go from const QUANTUM = 32768 to const QUANTUM = 120000. This modification would get the close a bit closer to 6.0.2

@Haroon-jay I did some tests again and I do not have the same results as you have ; I have more or less the same timing after several runs between 6.0.2 and 6.0.3 when launching

PGPORT=5432 PGDATABASE=postgres npx mocha -g 'heavy'

which launches a 130K rows test where the rows are prepared in memory before sending them (it more or less simulates a case where the source buffers rows very quickly compared to postgres taking its time to start the COPY operation)

I published version 6.0.4 which tries to improve a little over 6.0.3 and which should be better than 6.0.2.
Now the difficulty will be to benchmark all of this on your setups.

Hi @jeromew
I just tested out 6.0.4 and it works well until ~130000 rows and then exponentially slows down as the rows increase, I am guessing the quantum variable has something to do with it since it is equal to 130000. I can build a smaller version of my setup for you to test it out directly on your system, do you think that would be helpful?

@Haroon-jay if you can commit such a simplified version of your setup somewhere that reproduces the issue i will take a look at it to try and understand what is happening.

There may be a problem with the queue implementation that grows exponentially but the fundamental issue is that _writev should not be called with such big number of rows in the first place. The main reason I can see that can produce this is that the source stream does not respect backpressure (when calls to write return false).

cf writable.write documentation on https://nodejs.org/api/stream.html
The return value is true if the internal buffer is less than the highWaterMark configured when the stream was created after admitting chunk. If false is returned, further attempts to write data to the stream should stop until the ['drain'](https://nodejs.org/api/stream.html#event-drain) event is emitted.

Hi @jeromew I just tested out my setup by changing several processes and the exponential increase in time seems to be coming from our aggregation pipeline and nothing to do with pg-copy-streams.
Sorry about my previous response.

I am closing this as I think 6.0.4 fixes the original problem. Fell free to re-open if needed

Okay! Thank you for the quick fix!