ForbesLindesay/atdatabases

Pg queryNodeStream ignoring batchSize and highWaterMark

droganov opened this issue · 2 comments

In:

const query = sql`
        SELECT * FROM procedure(${collection}, ${channel}, ${merged})
`
const stream = db.queryNodeStream(query, { batchSize: 10, highWaterMark: 16 * 1024 })

for await (const chunk of stream)....

I expect chunk to be an array of 10 items, but instead it is just one item as a plain object instead of array

Yes, this is the intended behaviour. batchSize and highWaterMark control the behaviour in the underlying db queries, so tuning the batch size and high water mark can improve throughput/performance, but the "chunks" returned in the stream are still individual records. If you need to batch them, you can write a separate utility that pulls a set number of items from the stream and emits that array as a chunk. Something like:

async function* batchStream<T>(source: AsyncIterator<T>, batchSize: number): AsyncIterator<T[]> {
  let batch = []
  for await (const record of source) {
    batch.push(record)
    if (batch.length === batchSize) {
      yield batch
      batch = []
    }
  }
  if (batch.length) yield batch
}

const query = sql`
        SELECT * FROM procedure(${collection}, ${channel}, ${merged})
`
const stream = db.queryNodeStream(query, { batchSize: 10, highWaterMark: 16 * 1024 })

for await (const chunk of batchStream(stream, 10))....

P.S. the batchSize is how many records to fetch at a time, the highWaterMark is how many records to allow the stream to buffer before applying back pressure (if you are consuming items slower than they are being returned form the database). 16 * 1024 would be a very large number of records to have as your high water mark if your batch size is only 10.

Hello @ForbesLindesay
Thank you for the update, yes, that's how I solved it