kagis/pgwire

Question: close the connection of the streamed query before the results are fully read

dko-slapdash opened this issue · 7 comments

Hi. Is there a proper (clean) way to close the connection when a SELECT query is being streamed through it? I tried connection.end() and it does not terminate the connection if the query is still active.

E.g. imagine we run a SELECT on 100M rows and, after reading from it for 1 minute, we want to stop reading, preserve the last read position and close the connection (to later reschedule).

Currently, I have to use a hack to call conn.destroy() inside a stream's _destroy handler:

image

I suspect the fact that connection.end() doesn't help here may be a bug in the library. Because for replication streams it actually works fine:

image

P.S.
List of open connection at PG side: select application_name, pid, state, query from pg_stat_activity

True cancellation of queries is not implemented yet. In current implementation stream.destroy() just reads and ignores records until end of result. The problem with implementation is that postgres protocol allows to cancel current query only. Its implossible to gracefully cancel all queued queries as far as I know, so cancellation algorithm for pgwire is not clear for me now. I will think about it now.

Cancellation of replication stream uses different protocol and it is implemented.

You queryStream function is almost ok. You can avoid monkey patch and just subscribe to stream event by using builtin utility.

import { finished } from 'stream';

finished(stream, _ => conn.end());

This code will also handle successfull (non-destroy) case.

upd
finished(stream, _ => conn.end()); will swallow error silenlty. Need to rethrow it somehow

This is what I had to do actually:

finished(stream, err => {
  conn.destroy();
  if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
    throw err;
  }
});

And this is still not perfect, because for logicalReplication stream I suspect it may eat the last ackImmediate().

What I try to achieve for logicalReplication stream is to: a) send it a signal "please terminate gracefully", b) wait until this signal is correctly processed (i.e. all acks are flushed), and c) then destroy the connection.

How do I do this? I suspect that there is no way currently?

And this is still not perfect, because for logicalReplication stream I suspect it may eat the last ackImmediate().

replicationStream.destroy() stops replication stream gracefully. Call of replicationStream.ackImmediate() after replicationStream.destroy() is prohibited and exception will be thrown #9 #18

and after destroy of replicationStream you can use regular conn.end() as you wrote in the first comment.

I just understood that pgwire.pool already has logicalReplication function that behaves exactly as you want.

  logicalReplication(...args) {
    const conn = new Connection({
      ...this._options,
      replication: 'database',
    });
    return conn.logicalReplication(...args).finally(_ => {
      conn.end();
    });
  }

I forgot that there is no need to use finished at all. Just call conn.end() after replicationStream is created and gracefull connection termination will be queued and executed automatically after replication stream destroy

I'm trying to implement query cancellation for cases when there is only one pending query.

  • if queueSize > 1 then return and procceed with current skip-based implementation
  • block connection tx to prevent sending new queries while cancelling
  • send CancelRequest and wait for cancellation socket terminated by backend
  • unblock connection tx

query cancellation is implemented