Pg queryNodeStream ignoring batchSize and highWaterMark
droganov opened this issue · 2 comments
In:
const query = sql`
SELECT * FROM procedure(${collection}, ${channel}, ${merged})
`
const stream = db.queryNodeStream(query, { batchSize: 10, highWaterMark: 16 * 1024 })
for await (const chunk of stream)....
I expect chunk
to be an array of 10 items, but instead it is just one item as a plain object instead of array
Yes, this is the intended behaviour. batchSize
and highWaterMark
control the behaviour in the underlying db queries, so tuning the batch size and high water mark can improve throughput/performance, but the "chunks" returned in the stream are still individual records. If you need to batch them, you can write a separate utility that pulls a set number of items from the stream and emits that array as a chunk. Something like:
async function* batchStream<T>(source: AsyncIterator<T>, batchSize: number): AsyncIterator<T[]> {
let batch = []
for await (const record of source) {
batch.push(record)
if (batch.length === batchSize) {
yield batch
batch = []
}
}
if (batch.length) yield batch
}
const query = sql`
SELECT * FROM procedure(${collection}, ${channel}, ${merged})
`
const stream = db.queryNodeStream(query, { batchSize: 10, highWaterMark: 16 * 1024 })
for await (const chunk of batchStream(stream, 10))....
P.S. the batchSize
is how many records to fetch at a time, the highWaterMark
is how many records to allow the stream to buffer before applying back pressure (if you are consuming items slower than they are being returned form the database). 16 * 1024
would be a very large number of records to have as your high water mark if your batch size is only 10
.
Hello @ForbesLindesay
Thank you for the update, yes, that's how I solved it