client.query is not returning a stream
AbdelrhmanAbdelhamed opened this issue · 10 comments
I'm using "knex": "^0.95.4" and "pg": "^8.5.1" on NodeJS v16
const query = `users`
const client = await knex.client.acquireConnection();
dataStream = client.query(copyTo(`COPY (${query}) TO STDOUT CSV DELIMITER ',' HEADER`));
client.query
is not returning a stream it returns this object instead
{
"command": "COPY",
"rowCount": 69,
"oid": null,
"rows": [],
"fields": [],
"_types": {
"_types": {
"arrayParser": {},
"builtins": {
"BOOL": 16,
"BYTEA": 17,
"CHAR": 18,
"INT8": 20,
"INT2": 21,
"INT4": 23,
"REGPROC": 24,
"TEXT": 25,
"OID": 26,
"TID": 27,
"XID": 28,
"CID": 29,
"JSON": 114,
"XML": 142,
"PG_NODE_TREE": 194,
"SMGR": 210,
"PATH": 602,
"POLYGON": 604,
"CIDR": 650,
"FLOAT4": 700,
"FLOAT8": 701,
"ABSTIME": 702,
"RELTIME": 703,
"TINTERVAL": 704,
"CIRCLE": 718,
"MACADDR8": 774,
"MONEY": 790,
"MACADDR": 829,
"INET": 869,
"ACLITEM": 1033,
"BPCHAR": 1042,
"VARCHAR": 1043,
"DATE": 1082,
"TIME": 1083,
"TIMESTAMP": 1114,
"TIMESTAMPTZ": 1184,
"INTERVAL": 1186,
"TIMETZ": 1266,
"BIT": 1560,
"VARBIT": 1562,
"NUMERIC": 1700,
"REFCURSOR": 1790,
"REGPROCEDURE": 2202,
"REGOPER": 2203,
"REGOPERATOR": 2204,
"REGCLASS": 2205,
"REGTYPE": 2206,
"UUID": 2950,
"TXID_SNAPSHOT": 2970,
"PG_LSN": 3220,
"PG_NDISTINCT": 3361,
"PG_DEPENDENCIES": 3402,
"TSVECTOR": 3614,
"TSQUERY": 3615,
"GTSVECTOR": 3642,
"REGCONFIG": 3734,
"REGDICTIONARY": 3769,
"JSONB": 3802,
"REGNAMESPACE": 4089,
"REGROLE": 4096
}
},
"text": {},
"binary": {}
},
"RowCtor": null,
"rowAsArray": false
}
Hello,
Thanks for the report.
I am not familiar with knex and I don't know if the client it gives you is a real pg
client or a modified wrapper.
Do you have the same result when using the pg
API directly ?
"rowCount": 69 would probably mean that the query was executed and returned 69 rows. Do you have 69 rows in your test table ?
Thank you @jeromew for your response.
Yes, I actually have 69 rows in my test table. what does that mean that the query was executed?
I can confirm that it's an actual pg client that used to work before.
knex uses tarn pool under the hood
so knex.client.acquireConnection()
returns a client from the pool.
// Acquire a connection from the pool.
async acquireConnection() {
if (!this.pool) {
throw new Error('Unable to acquire a connection');
}
try {
const connection = await this.pool.acquire().promise;
debug('acquired connection from pool: %s', connection.__knexUid);
return connection;
} catch (error) {
let convertedError = error;
if (error instanceof TimeoutError) {
convertedError = new KnexTimeoutError(
'Knex: Timeout acquiring a connection. The pool is probably full. ' +
'Are you missing a .transacting(trx) call?'
);
}
throw convertedError;
}
}
I don't know exactly. It is as if
client.query(`COPY (${query}) TO STDOUT CSV DELIMITER ',' HEADER`);
was run an not
client.query(copyTo(`COPY (${query}) TO STDOUT CSV DELIMITER ',' HEADER`));
The last time an issue like that was encountered it was because the OP was using open-telemetry
(#143) because open-telemetry
has a bug wrt the Submittable interface of pg
Are you using open-telemetry
?
Could you give me a 10 lines runnable example that show the issue that I could run to reproduce your issue (with all required modules, console.logs etc.) ?
The following works for me with node 18.9.0, knex 2.4.2, pg 8.8.0
const knex = require('knex')({
client: 'pg',
connection: 'postgres://127.0.0.1:5432/postgres',
searchPath: ['public'],
});
const copyTo = require('./copy-to.js');
(async function() {
const client = await knex.client.acquireConnection();
const datastream = client.query(copyTo(`COPY (SELECT generate_series(1,10)) TO STDOUT`))
datastream.pipe(process.stdout)
})()
result (program is not finishing because I did not release the client at the end of the query)
1
2
3
4
5
6
7
8
9
10
so at this stage, and with this working 10 line example, I would say that it is working as expected.
@jeromew Thank you for your support and prompt response. I really appreciate it.
Can you try your example using the versions that I currently use? I'm using "knex": "^0.95.4" and "pg": "^8.5.1" on NodeJS v16.16.0
While I'm trying to extract the log for you
it works for me with
├── node.js 16.16.0
├── knex@0.95.4
├── pg@8.5.1
so I think the problem is something else on your side.
Make sure you get the basic example above working with a clean install and then try to understand why your setup is not working. It might be because of something that modifies pg
, like open-telemetry
you may also want to log console.log(client.query.toString())
to check that the client.query
function is not modified.
Here's output of console.log(client.query.toString())
function() {
if (!startCh.hasSubscribers) {
return query.apply(this, arguments)
}
const callbackResource = new AsyncResource('bound-anonymous-fn') const asyncResource = new AsyncResource('bound-anonymous-fn') const processId = this.processID
const pgQuery = arguments[0] && typeof arguments[0] === 'object' ? arguments[0] : {
text: arguments[0]
}
// shallow clone the existing query to swap out .text field
let newQuery = {
...pgQuery
}
return asyncResource.runInAsyncScope(() => {
startCh.publish({
params: this.connectionParameters,
query: newQuery,
processId
}) arguments[0] = newQuery
const finish = asyncResource.bind(function(error) {
if (error) {
errorCh.publish(error)
}
finishCh.publish()
}) const retval = query.apply(this, arguments) const queryQueue = this.queryQueue || this._queryQueue
const activeQuery = this.activeQuery || this._activeQuery newQuery = queryQueue[queryQueue.length - 1] || activeQuery
if (!newQuery) {
return retval
}
if (newQuery.callback) {
const originalCallback = callbackResource.bind(newQuery.callback) newQuery.callback = function(err, res) {
finish(err) return originalCallback.apply(this, arguments)
}
} else if (newQuery.once) {
newQuery.once('error', finish).once('end', () => finish())
} else {
newQuery.then(() => finish(), finish)
}
try {
return retval
} catch (err) {
errorCh.publish(err)
}
})
}
I am not sure I am helping you by giving you the answer now. Knowing how to Investigate this kind of bugs is important.
by searching with "startCh.publish" in github, you will find that this code comes from datadog, more specifically in https://github.com/DataDog/dd-trace-js/blob/master/packages/datadog-instrumentations/src/pg.js
So your code is indeed instrumented and pg is modified at runtime for this instrumentation.
So this is the same problem that exists with open-telemetry
but now with datadog
.
This is a bug in datadog because it does not take into account the Submittable
interface of pg
.
Datadog should be transparent and not break things for people so this bug should be fixed in datadog (and cannot be fixed by modifying pg-copy-streams
)
@jeromew Thank you for all your help in tracking this down. Indeed that is the root cause.
I can see that dd-trace started to cause issues from v3.22.1 to v4.2.0
DataDog/dd-trace-js@v3.22.0...v3.22.1
// shallow clone the existing query to swap out .text field
let newQuery = { ...pgQuery }
the issue is fixed if I used dd-trace v3.22.0
I reported the issue to datadog here
Great find here. We're seeing the same issue with dd-trace
at 4.1.1
and 4.2.0
. In case anyone else stumbles across this issue, it manifested for us as this snippet:
rs.pipe(client.query(copyFrom('COPY table_name FROM STDIN')))
.on('error', (err) => rej(err))
.on('finish', () => res());
throwing this error:
/REDACTED/node_modules/pg-protocol/src/parser.ts:369
name === 'notice' ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)
^
error: COPY from stdin failed: No source stream defined
at Parser.parseErrorMessage (/REDACTED/node_modules/pg-protocol/src/parser.ts:369:69)
at Parser.handlePacket (/REDACTED/node_modules/pg-protocol/src/parser.ts:188:21)
at Parser.parse (/REDACTED/node_modules/pg-protocol/src/parser.ts:103:30)
at Socket.<anonymous> (/REDACTED/node_modules/pg-protocol/src/index.ts:7:48)
at Socket.emit (node:events:513:28)
at Socket.emit (node:domain:489:12)
at Socket.emit (/REDACTED/node_modules/dd-trace/packages/datadog-instrumentations/src/net.js:61:25)
at addChunk (node:internal/streams/readable:324:12)
at readableAddChunk (node:internal/streams/readable:297:9)
at Socket.Readable.push (node:internal/streams/readable:234:10)
at TCP.onStreamRead (node:internal/stream_base_commons:190:23)
at TCP.callbackTrampoline (node:internal/async_hooks:130:17)
The thing that tipped me off that it could be this issue was that No source stream defined
seems to occur as an error for the default handler of the copyInResponse
event for pg
clients: https://github.com/brianc/node-postgres/blob/master/packages/pg/lib/query.js#L232