Receiving [ERR_STREAM_PREMATURE_CLOSE] in the logs when intentionally closing the ResultSet stream before end
spinlud opened this issue · 6 comments
Describe the bug
I have a stream generated from a ResultSet of a query. When consuming the stream, I need to end it prematurely if a certain condition is met:
import {pipeline} from 'stream/promises';
import {createWriteStream} from 'fs';
import {createClient} from '@clickhouse/client';
async function* myAsyncGenerator() {
const clickHouseClient = createClient({
// ...
});
const resultSet = await clickHouseClient.query({
query: `select * from my_table limit 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
});
const stream = resultSet.stream();
let i = 0;
for await (const rows of stream) {
for (const row of rows) {
i++;
let jsonRow = await row.json();
// If a condition is true, I want to stop consuming the stream prematurely
if (i === 3) {
return stream.destroy();
}
yield jsonRow.join(',') + '\n\n';
}
}
}
(async () => {
const asyncGen = myAsyncGenerator();
const outputStream = createWriteStream('output.txt');
await pipeline(asyncGen, outputStream);
})();
This code terminates successfully (exit code 0) but it logs the following[ERR_STREAM_PREMATURE_CLOSE]
error on the console:
Expected behaviour
I could be wrong, but this [ERR_STREAM_PREMATURE_CLOSE]
error doesn't seem to originate on the stream object received from the call to the database, I suspect instead that this can orginate from another stream that is writing on this one (e.g. a stream from the socket who is writing the data?).
In this case I am voluntarily closing the stream before consuming all the data, so I don't want to see this [ERR_STREAM_PREMATURE_CLOSE]
logged on the console. Also it doesn't seem to be an error, but just the log of an error, because the program terminates with success (exit code 0).
Is there any way to prevent this [ERR_STREAM_PREMATURE_CLOSE]
error log to appear in the console?
Error log
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at new NodeError (node:internal/errors:405:5)
at Transform.onclose (node:internal/streams/end-of-stream:159:30)
at Transform.emit (node:events:529:35)
at Transform.emit (node:domain:489:12)
at emitCloseNT (node:internal/streams/destroy:132:10)
at processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
Process finished with exit code 0
Configuration
Environment
- Client version:
0.2.7
- Language version:
Node v18.18.0
- OS:
Darwin arm64
ClickHouse server
- ClickHouse Server version: 24.2.2.16100
If the desired logic cannot be implemented via the ClickHouse query, you could just use AbortController.
import { pipeline } from 'stream/promises'
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'
async function* myAsyncGenerator() {
const clickHouseClient = createClient({
// ...
})
const abortController = new AbortController()
const resultSet = await clickHouseClient.query({
query: `select * from system.numbers limit 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
abort_signal: abortController.signal,
})
let i = 0
for await (const rows of resultSet.stream()) {
for (const row of rows) {
i++
// If a condition is true, I want to stop consuming the stream prematurely
if (i === 5) {
console.log('aborting')
abortController.abort()
break
}
const result = row.json<string[]>().join(',') + '\n\n'
console.log('Yielding row', result)
yield result
}
}
}
;(async () => {
const asyncGen = myAsyncGenerator()
const outputStream = createWriteStream('output.txt')
await pipeline(asyncGen, outputStream)
})()
console output:
➜ examples git:(main) ✗ ts-node node/test.ts
Yielding row number
Yielding row UInt64
Yielding row 0
Yielding row 1
aborting
The output.txt file contents:
NB: You might want to add https://clickhouse.com/docs/en/operations/settings/settings#cancel-http-readonly-queries-on-client-close. Also, see the abort_request example.
You are right. I tried adjusting the asyncGenerator approach again, which was not trivial. Maybe something like this will do the trick? Please notice the backpressure comment — you'll likely need to handle this with such a manual approach.
import { createWriteStream } from 'fs'
import { createClient, ClickHouseLogLevel } from '@clickhouse/client'
(async () => {
process.on('unhandledRejection', (err) => {
console.error('unhandledRejection:', err)
})
process.on('uncaughtException', (err) => {
console.error('uncaughtException:', err)
})
const client = createClient({
log: {
// level: ClickHouseLogLevel.TRACE,
},
})
const abortController = new AbortController()
const resultSet = await client.query({
query: `select * from system.numbers limit 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
abort_signal: abortController.signal,
})
let i = 0
const stream = resultSet.stream<string[]>()
const outputStream = createWriteStream('output.txt')
await new Promise((resolve, reject) => {
stream
.on('data', (rows) => {
for (const row of rows) {
if (i++ === 5) {
console.log('Reached the condition, ending stream...')
abortController.abort()
return
}
const result = row.json().join(',') + '\n\n'
console.log('Yielding row', result)
// important: need to add backpressure handling here
outputStream.write(result)
}
})
.on('error', (err) => {
console.error('Error in stream', err)
reject(err)
})
.on('end', () => {
console.log('End of stream, resolve...')
resolve(0)
})
})
console.log('Closing client...')
await client.close()
})()
Prints:
Yielding row number
Yielding row UInt64
Yielding row 0
Yielding row 1
Yielding row 2
Reached the condition, ending stream...
[2024-05-08T14:37:53.701Z][TRACE][@clickhouse/client][Connection] Socket 0ae5c20c-868e-471c-8b2f-43f7a2ffb68e was released
End of stream, resolve...
Closing client...
[2024-05-08T14:37:53.701Z][TRACE][@clickhouse/client][Connection] Socket 0ae5c20c-868e-471c-8b2f-43f7a2ffb68e was closed or ended, 'free' listener removed
This output looks right to me - the socket is properly released on the abort event and is back in the keep-alive pool, and it is only destroyed when we close the client.
And with just 1 socket and the second consecutive request, all seems OK, too.
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'
import { ClickHouseLogLevel } from '@clickhouse/client-common'
;(async () => {
process.on('unhandledRejection', (err) => {
console.error('unhandledRejection:', err)
})
process.on('uncaughtException', (err) => {
console.error('uncaughtException:', err)
})
const client = createClient({
max_open_connections: 1,
log: {
level: ClickHouseLogLevel.TRACE,
},
})
const abortController = new AbortController()
const resultSet = await client.query({
query: `select * from system.numbers limit 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
abort_signal: abortController.signal,
})
let i = 0
const stream = resultSet.stream<string[]>()
const outputStream = createWriteStream('output.txt')
await new Promise((resolve, reject) => {
stream
.on('data', (rows) => {
for (const row of rows) {
if (i++ === 5) {
console.log('Reached the condition, ending stream...')
abortController.abort()
return
}
const result = row.json().join(',') + '\n\n'
console.log('Yielding row', result)
// important: need to add backpressure handling here
outputStream.write(result)
}
})
.on('error', (err) => {
console.error('Error in stream', err)
reject(err)
})
.on('end', () => {
console.log('End of stream, resolve...')
resolve(0)
})
})
const rs = await client.query({
query: 'SELECT 1 AS number',
format: 'JSONEachRow',
})
console.log(
'Verifying that we can query using the same socket one more time...',
await rs.json(),
)
console.log('Closing client...')
await client.close()
})()
Yielding row number
Yielding row UInt64
Yielding row 0
Yielding row 1
Yielding row 2
Reached the condition, ending stream...
[2024-05-08T14:47:24.767Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was released
End of stream, resolve...
[2024-05-08T14:47:24.767Z][TRACE][@clickhouse/client][Connection] Reusing socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3
...
[2024-05-08T14:47:24.774Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was released
Verifying that we can query using the same socket one more time... [ { number: 1 } ]
Closing client...
[2024-05-08T14:47:24.774Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was closed or ended, 'free' listener removed
I think I understand what is happening now.
Due to the query being in the async generator function body, an unfinished request goes out of scope, and so does the AbortController that is used internally to cancel the request on errors in the Node.js connection src.
That internal AbortController is used to prevent the underlying sockets from being stuck while dialing an unreachable host, which can happen even if the request was timed out (and that was the only sensible solution I could find to that issue).
A fun fact about the AbortController is that it fires the abort signal when it goes out of scope. This also explains why my stream example does not produce any errors, as there is no extra function there (but I believe it is still incorrect), so the query (and the request) does not go out of scope.
Then, the error is printed here.
I will check if I can make it less annoying and more transparent to the user so that just the ResultSet.close
method can be called, and that's it, with proper discard of the response stream while keeping the socket and without unnecessary error messages in the console, as the last part is very outdated and is basically a tech debt.
Should be fixed in 1.0.2.
import { pipeline } from 'stream/promises'
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'
async function* myAsyncGenerator() {
const clickHouseClient = createClient({
// ...
})
const resultSet = await clickHouseClient.query({
query: `SELECT * FROM system.numbers LIMIT 10`,
format: 'JSONCompactStringsEachRowWithNamesAndTypes',
})
const stream = resultSet.stream()
let i = 0
for await (const rows of stream) {
for (const row of rows) {
// If a condition is true, I want to stop consuming the stream prematurely
if (i++ === 5) {
return stream.destroy()
}
yield row.json<string[]>().join(',') + '\n'
}
}
}
;(async () => {
const asyncGen = myAsyncGenerator()
const outputStream = createWriteStream('output.txt')
await pipeline(asyncGen, outputStream)
})()
Yields the following file:
number
UInt64
0
1
2
Without unnecessary errors in the logs.
Please feel free to re-open or create a new one if there are still any issues.