brianc/node-pg-copy-streams

Memory allocation failed - continouos data feed stream

Closed this issue · 7 comments

Hi! Thanks for the work on this package.

I am using it to insert a continuous data feed that comes in the form of an array. Let me describe:

The table I want to insert has - for example - three columns (colA,colB,colC).

The data feed comes in the form of an array that may contain one of multiple rows - and the time at which this data feed comes is often continuously but not always. What I mean by that is I may receive one message like this:

[colA,colB,colC]

then immediately after one like this:

[colA,colB,colC,colA,colB,colC,colA,colB,colC,colA,colB,colC]

In order to insert messages such as the second one I have to split it into arrays of the three columns and insert it. Note that at high traffic times I may be receiving thousands of these messages each containing maybe hundreds of rows. This is how the code looks like:

service.subscribe(DATA_CHANNEL, msg => {
  
  const [ payloadType, payload ] = msg.data
  
  const stream = client.query(copyFrom("COPY TableName FROM STDIN WITH CSV DELIMITER ',' QUOTE '\"'"))
  const readable = new Stream.Readable()

  while (payload.length > 0) {
    
    const singleLoad = payload.splice(0,CHANNEL_DATA_LENGTH)
    
    if (singleLoad[13] === '\x00') singleLoad[13] =  0
    const singleLoadName = singleLoad[0]
    

    const json = JSON.stringify(singleLoad)
    readable.push(json.substring(1, json.length-1)+'\n')
    
  }
  readable.push(null)

  stream.on('error', (err) => console.log('streamer',err))

  readable.pipe(stream)
   
})

This code is working fine for some feeds with not so many incoming messages. But one feed with thousands of messages per second I am getting the following error at which point the script crashes:

<--- Last few GCs --->

[3452293:0x544b8f0]  6915398 ms: Mark-sweep (reduce) 4083.9 (4100.8) -> 4082.9 (4102.1) MB, 2322.7 / 0.0 ms  (average mu = 0.076, curred
[3452293:0x544b8f0]  6917672 ms: Mark-sweep (reduce) 4083.9 (4104.1) -> 4083.0 (4104.1) MB, 2265.7 / 0.0 ms  (average mu = 0.041, curred


<--- JS stacktrace --->

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory
 1: 0xa02dd0 node::Abort() [node]
 2: 0x94e471 node::FatalError(char const*, char const*) [node]
 3: 0xb7686e v8::Utils::ReportOOMFailure(v8::internal::Isolate*, char const*, bool) [node]
 4: 0xb76be7 v8::internal::V8::FatalProcessOutOfMemory(v8::internal::Isolate*, char const*, bool) [node]
 5: 0xd31485  [node]
 6: 0xd3200f  [node]
 7: 0xd4009b v8::internal::Heap::CollectGarbage(v8::internal::AllocationSpace, v8::internal::GarbageCollectionReason, v8::GCCallbackFla]
 8: 0xd43c5c v8::internal::Heap::AllocateRawWithRetryOrFailSlowPath(int, v8::internal::AllocationType, v8::internal::AllocationOrigin, ]
 9: 0xd09562 v8::internal::Factory::AllocateRaw(int, v8::internal::AllocationType, v8::internal::AllocationAlignment) [node]
10: 0xd033e4 v8::internal::FactoryBase<v8::internal::Factory>::AllocateRawWithImmortalMap(int, v8::internal::AllocationType, v8::intern]
11: 0xd053f0 v8::internal::FactoryBase<v8::internal::Factory>::NewRawOneByteString(int, v8::internal::AllocationType) [node]
12: 0xf665ba v8::internal::String::SlowFlatten(v8::internal::Isolate*, v8::internal::Handle<v8::internal::ConsString>, v8::internal::Al]
13: 0xd19d27 v8::internal::Factory::NewProperSubString(v8::internal::Handle<v8::internal::String>, int, int) [node]
14: 0x1095390 v8::internal::Runtime_StringSubstring(int, unsigned long*, v8::internal::Isolate*) [node]
15: 0x13fe259  [node]
Aborted (core dumped)

I feel like I am doing something clearly wrong - maybe by instantiating a new client.query and a new stream every time a new message comes? I do that because every message that comes should be inserted - can't wait until the feed finishes because it never finishes - it runs 24/7.

My main concern is to know if there is something I can do with the code to address this issue or whether I should look for another solution that addresses my use case (streaming thousands of continuous messages into db.)

Maybe this package is not intended for this type of continuous feed? Any help or guidance would be greatly appreciated.

Cheers.

At first glance, I don't think this is related to pg-copy-streams directly. The issue is here your service.subscribe(data => callback is likely producing data faster than pg-copy-streams can write it out. So your node process is accumulating more and more data backed up into memory.

You're also making a new stream every time you receive a message, in the code above instead of using a single stream and copying all the messages in. You might want to reuse a single stream instance instead of making a new instance on each message received.

The main issue is you need a way to throttle these messages as you receive them, and then read them off and write them into postgres via pg-copy-streams or any other method at a rate your database can handle. Think of it like pouring water into a cup with a hole at the bottom...if you pour in more water than can flow out the bottom your cup will overflow. The cup in this situation is your application's heap. Typically this is handled with some kind of work queue or pub/sub system in which the consumer of the message can consume messages as they can process them. So you'd need some system to receive the messages from the fire hose, put them into a queue (something like SQS, google pub/sub, rabbitMQ, kafka if you feeling fancy) and then have consumers read from that queue. The queue functions as a cup in this scenario, but message queues are made to be very large cups that can hold a lot of incoming data and let consumers read from it as they can.

firehose -> queue ->  worker1 reads from queue and writes to postgres
                          -> worker 2 reads from queue and writes to postgres

If your service.subscribe has some sort of unsubscribe or pause mechanism that would also work - you'd subscribe, get a batch of messages, pause the service from emitting more while you write the messages, then resume consumption once the last batch was written.

Thank you for answering! Extremely helpful.

Yes I agree the crux of the issue what you so well described as the cup and the water - more data is being produced than can be consumed.

I also do believe your guidance regarding the message queue is the direction I will take.

Although this might be beyond the scope of pg-copy-stream you seem to understand this and maybe can give me a pointer to this concern:

If I follow the diagram you describe where I have the firehouse -> queue -> worker. How can I ensure the worker only consumes as much as it can handle? In other words - if the worker subscribes to updates in the queue and updates occur at an extremely fast pace - how do I avoid running into the same problem?

You have already helped me tremendously with those comments and I greatly appreciate it! Also cheers for the amazing work on node-pg and this package!

If you only have one postgres server, your should measure the ingestion throughput that it can accept. If it is inferior to the maximum throughput your are receiving in your 24/7 window there is no way you will be able to keep pace.

You also need to find a queuing solution for the moments when you will need to do some maintenance on your postgres server. Depending on the volume of messages we are talking about, redis streams could be a solution.

some remarks on the code

  • There is something that maybe you can try : moving readable.pipe(stream) above the while loop. Currently you are forcing into memory (the readable variable) before piping the stream into postgres. by moving the variable, the needed memory could be lower because the sending could begin before the end of the while loop. but it depends on the size of your messages
  • When you are receiving 1000s of messages per second, it is not a good thing to create 1000s of COPY messages per second I believe so you need to do things differently.
  1. creation 2 readable streams: pending and flowing
  2. choose a schedule that makes sense for your system (1 sec, 5 sec, ..)
  3. init the system by piping flowing into a pg COPY. the state is now 'flowing'
  4. setTimeout a call to a 'commit' function every N sec
  5. when state is flowing, push data into flowing ; when state is pending, push data into pending
  6. when commit is called, set the state to 'pending', push null to flowing in order to close the COPY operation. create a new COPY operation, rename pending to flowing, create a new pending, pipe flowing to pg COPY, set state to flowing

The ideal I think would be to wait until the first COPY operation if totally finished before doing the switch of pending to flowing.

This algo would I think be a lot more pg friendly. With the current code you might be creating a congestion in postgres with too many COPY operations on the same table at the same time.

  • you could also try to not use this module and use multi-line inserts which can go a long way (inserting many row using only one insert command which postgres knows how to handle)

Keep us posted !

@jeromew: I found out what happens if you always have a COPY in progress. It works great until something finally manages to get a lock on the table and then you have a bunch of COPY operations pending. From a locking perspective, it's really not ideal to start a COPY, stream data into it, close that stream, start a new COPY stream an immediately pipe into it, you're effectively never yielding the table for an exclusive lock.

🔥 If you do not give PostgreSQL some breathing room between COPY operations; you're going to run into 🔒 locking issues 🔒.

I'm interested if anyone has any ideas for the best ways to handle this entirely. I'm going to dump some tips for anyone working on this in the future.

Pro-tip: Safely create async batches

Under high load, a new event may arrive before the previous event's call to getBatch() resolves. This can lead to orphaned batches (memory leak) and lost messages.

const getBatch = async () => {
  // If another batch is already pending; we return the promise that will resolve to that batch.
  // You can .then() or await the same promise multiple times; you cannot resolve()/reject() multiple values
  if (nextBatch) {
    return nextBatch
  }

  try {
    nextBatch = producer.createBatch({ maxSizeInBytes })
    const batch = await nextBatch
    return batch
  } finally {
    nextBatch = undefined
  }
}

Anti-pattern: Immediately unpipe/piping another stream into rotating COPY streams

// Ooops! If you immediately rotate streams you will always have a lock on the table (until you don't and then COPY will pile up in a `WAITING` state, especially if you have control/metadata queries interleaved between)
const rotateCopyStream = async (logStream) => {
  // disconnecting the csv stream from our log stream, causes log stream to pause
  if (csvStream) logStream.unpipe(csvStream)

  if (pgStream) {
    // disconnecting the pg stream from the csv stream causes the csv stream to end
    csvStream.unpipe(pgStream)
    pgStream.end() // calling this ends the COPY command
  }

  // Get connection and execute our copy command
  const client = await pool.connect()
  pgStream = client.query(copyFrom(COPY_QUERY))

  // Debugging helps
  pgStream.on('error', (error) => {
    if (client) client.release()
    console.log('pgSteam -> stream errored')
    console.log(error)
  })

  pgStream.on('finish', () => {
    if (client) client.release()
    console.log('pgStream -> stream complete')
  })

  csvStream = format({headers: false, includeEndRowDelimiter: true, delimiter: '\t'})
  logStream.pipe(csvStream, {end: false}).pipe(pgStream)
}

@jbtobar: I'm using 100% streams in my workflow, but I've found a Node.js foot canon when batching things coming from an event emitter/event handler/subscription. If batch creation is async, you need to make sure if a second event comes in while the batches are being rotated, that you don't end up creating two or more batches. An example of how to accomplish this is below:

I considered putting an advisory lock at the beginning of the COPY command. This would give us a chance to keep our input stream paused and resume flow when the last COPY is complete with the possibility of application-level logic. I haven't tried it yet, but, that's my next plan of attack.

@jmealo thanks for the report.

As I understand it, you have been trying to use this library to do a long-running, high-throuput ingestion job.

  • first by using one long COPY operation, but this leads to issues because the ingested data can only be viewed once the COPY operation is finished and commited

  • second by creating a sequence of rotating COPY operations, but this does not work well when there is another concurrent job that needs a full lock on the table, because either (1) this other job does not manage to get the lock because of the density of the COPY operations beeing rotated or (2) this other job manages to get the lock but then blocks subsequent COPY operations until it releases the lock, which, under a high-throuput scenario can create a huge buffer of data to ingest

  • and now you are looking for other solutions to avoid this problem

did I understand correctly where you are coming from ? can you give a bit more explanations on the concurrent jobs you are trying to execute during ingestion ?

For info, during ingestion, the COPY operation takes ROW EXCLUSIVE lock, just like INSERT, UPDATE and DELETE. cf https://stackoverflow.com/questions/61643550/what-locks-does-postgresql-copy-method-require

I am closing this issue in favor of #118 in case some people want to share their chunking / long-ingestion batching solutions