brianc/node-pg-copy-streams

Please handle copyBothResponse in copy-to.js _parse

Closed this issue · 25 comments

It's possible to use copyToStream to handle logical replication stream provided that you add a line in your _parse. It's weird you have the code in message-formats.js but did not handle the case properly in _parse.

Thanks.

Hello,
thanks for the report. Can you give me a little more info on what you are doing with sample code ?
the module was originally written for copyIn (writable COPY FROM STDIN) and copyOut (readable COPY TO STDOUT) and I have to understand how copyBoth works to see what we can do.

You are saying that "it's possible" simply by accepting the copyBothResponse so I imagine that you have done it locally with some success.

I have not tried it, but what do you think of https://www.npmjs.com/package/pg-logical-replication ?

For further reference

  1. logical replication is only available in postgres >= 10

  2. mentioned in https://www.postgresql.org/docs/10/protocol-flow.html#PROTOCOL-COPY

There is another Copy-related mode called copy-both, which allows high-speed bulk data transfer to and from the server. Copy-both mode is initiated when a backend in walsender mode executes a START_REPLICATION statement. The backend sends a CopyBothResponse message to the frontend. Both the backend and the frontend may then send CopyData messages until either end sends a CopyDone message. After the client sends a CopyDone message, the connection goes from copy-both mode to copy-out mode, and the client may not send any more CopyData messages. Similarly, when the server sends a CopyDone message, the connection goes into copy-in mode, and the server may not send any more CopyData messages. After both sides have sent a CopyDone message, the copy mode is terminated, and the backend reverts to the command-processing mode. In the event of a backend-detected error during copy-both mode, the backend will issue an ErrorResponse message, discard frontend messages until a Sync message is received, and then issue ReadyForQuery and return to normal processing. The frontend should treat receipt of ErrorResponse as terminating the copy in both directions; no CopyDone should be sent in this case. See Section 52.4 for more information on the subprotocol transmitted over copy-both mode.
3. https://www.postgresql.org/docs/10/protocol-replication.html

explanations about the subprotocol transmitted over copy-both mode

Well, yes, I have tried using pg-logical-replication, and there's a problem with that package that it's listening on 'data' event, forgoing all backpressure management provided by node stream.

In fact I have listed out my concern in an issue there at kibae/pg-logical-replication#9. But it seems nobody care, at least author doesn't care.

Currently, I am monkey patching your pg-copy-streams adding a single line to handle copyBothResponse, actually just treating it the same as copyOutResponse is all I have to do. Now, this is how I use your copy-to stream to stream logical replication

const subscription = new copyTo('START_REPLICATION SLOT xxx LOGICAL 00000000/00000000 ("proto_version" '1', publication_names 'xxx')')

const stream = client.query(subscription)
await pipeline(stream,
  new copyDataHandler(),  // This is a transform stream to handle copyData Message and send feedback back periodically
  new PgOutputOutput(),  //  This is a transform stream to translate things encoded by pgoutput plugin, this can be virtually anything matched with the output by the decoding plugin on the server side
  new MyDestinationStream() // Do my stuff according the the decoded output
)

The idea is that I can pipeline my stream to anywhere I want with logical replication and with proper backpressure management, pg-logical-replication cannot do that. In a sense I am doing ETL from pg source.

I understand. We do try to correctly implement backpressure in pg-copy-streams. It is not easy because internally pg uses on('data') also so it was necessary to shorcut the pg protocol parser. I believe that since 4.0.0 correct backpressure is implemented but there are not tests regarding backpressure. Did your test show that it works correctly ?

I have not yet worked with logical replication but was a thing on my radar we could try and make it work in this module.

it's interesting to know that your patch gives you correct result. By accepting copyBothResponse in copy-to, you implicitely downgrade the copyBoth operation to a copyOut and receive the logical replication events. I believe that you never stop the connection and listen indefinitely for events so that is not a major problem.

regarding copyDataHandler, you say "This is a transform stream to handle copyData Message and send feedback back periodically"
what do you mean by "send feedback back periodically" ? does this transform only analyse the binary messages sent during logical replication (XLogData , Primary keepalive message, Standby status update, Hot Standby feedback message as per https://www.postgresql.org/docs/10/protocol-replication.html)

regarding PgOutputOutput, you talk about plugins. Are you making a reference to a specific module ? If I understand correctly this transforms the logical replication messages into something more useful ?

there is something I do not undertstand about copyBoth : the copyOut section is used by the server to send the logical replication events to the client, but what is the copyIn section used for ?

Depending on this,

  • either we can patch copy-to to accept copyBothResponse and maybe close the copyIn section of the copyBoth - but something feels wrong to me at this stage
  • or we can create a copy-both that would start as a duplex stream - that would more closely match the specification that says that copy-both, which allows high-speed bulk data transfer to and from the server
  • or something else needs to be done. pg-logical-replication seems to send status updates regularly

What do you think is the consequence of totally disregarding the copyIn section of copyBoth during logical replication ?

I found that mentioned in one of the issues of pg-logical-replication. Did you test https://github.com/kagis/pgwire that seems to implement logical replication ?

I found that mentioned in one of the issues of pg-logical-replication. Did you test https://github.com/kagis/pgwire that seems to implement logical replication ?

Oh My. I have been blinded by just looking at your library only, they already did most of the stuff I have done. My PgOutputOutput is almost the same as their replicationStream.pgoutput() on pgwire. Turns out they already implement most of the stuff I did myself, should have search for it earlier, perhaps their name pgwire doesn't ring the bell too much to me. Thanks.

Regarding to your question earlier,

I understand. We do try to correctly implement backpressure in pg-copy-streams. It is not easy because internally pg uses on('data') also so it was necessary to shorcut the pg protocol parser. I believe that since 4.0.0 correct backpressure is implemented but there are not tests regarding backpressure. Did your test show that it works correctly ?

Yes, my test works correctly. Actually my original implementation almost mirror your _attach/_detach routine in copy-to.js, it's very difficult to support event handling and streaming with backpressure support the same time, so I understand what you have been doing in node-postgres. IMHO with nowadays async iterable and similar stuff, we don't really need event handling like that of the pre-ES6 era, but for compatibility sake, what can I say. I imagine you would have start implementation using stream right away if you were to build the node-postgres now.

regarding copyDataHandler, you say "This is a transform stream to handle copyData Message and send feedback back periodically"
what do you mean by "send feedback back periodically" ?

Well, In order for the database to not timeout during replication when the database doesn't have anything new to send, we need to send a feedback signal periodically, an r message to keepalive our connection. That's why it's a copyBoth message to begin with, the replication stream from database is using your copyOut infrastruture, and the feedback is initiated from my copyDataHandler() using sendCopyFromChunk to send a feedback signal and to acknowledge our flushed LSN position.

regarding PgOutputOutput, you talk about plugins. Are you making a reference to a specific module ? If I understand correctly this transforms the logical replication messages into something more useful ?

Please refer to pgwire replicationStream.pgoutput(), that is exactly the purpose. To add to that, the server side can use whatever plugin you like, and we can use the relevant plugin to decode the message. e.g. by using wal2json, we can decode json directly from the stream. But not everyone has wal2json installed, and pgoutput should be the default for every postgresql db, so it's reasonable to support that out of the box.

there is something I do not undertstand about copyBoth : the copyOut section is used by the server to send the logical replication events to the client, but what is the copyIn section used for ?

Please refer to my previous answer on feedback, you don't really need to handle copyIn unless you want to implement feedback mechanism in node-postgres.

What do you think is the consequence of totally disregarding the copyIn section of copyBoth during logical replication ?

It's ok, I guess, the server will disconnect you once there's nothing to send or server timeout reached. Of course, it would be great if you implement the feed back in your code, but that's up to you. Right now, I need to monkey patch because copy-to doesn't work out-of-box, if you add in a pass through for copyBothResponse, it will be easier for I or somebody else with the knowledge can just do the feedback routine downstream, or better, create a new class say PgLogicalReplicationStream extends PgCopyTo using your copy-to infrastructure and add an interval routine to send feedback periodically.

One more thing I want to mention, it's possible to stop the replication by sending endCopyFrom() and return to query mode, I think you have already handle copyDone correctly, so that part is pretty much a given now.

Regarding pgwire their pgoutput stream implementation is interesting to have. Did you publish a module with your implementation since any person wanting to use pg-copy-stream for logical replication will need something like this ?

strangely, if i understand correctly their code, the ReplicationStream also uses on('data') on the postgres connection via this._rx.on('data', this._onData.bind(this)); so it will not solve the backpressure issue.

I'll add that I have a doubt on the approach taken by pgwire because they seem rely on the fact that chunks and message boundaries are identical. There has been work recently in pg-copy-streams to make sure that chunk boundaries are not a problem while accepting very large copyData messages where a copyData message spans across several network chunks (as seen in very large bytea fields). I guess that some logical messages can also become large (> 64KB) and if yes that can become an issue in pgwire with the current code.

I tend to think that we can make a copy-both.js implementation that would be a duplex stream handling cleanly the copyBoth mechanism (both the copyIn-Writable and copyOut-Readable sections). Would you use that instead of pg internal `sendCopyFromChunk' or 'endCopyFrom' (I didn't know those existed).

This could maybe allow for pipelines like copyBoth>>PgOutput>>MyDestinationStream>>copyBoth or Keepalive>>copyBoth>>PgOutput>>MyDestinationStream or more hypothetically Keepalive>>copyBoth>>PgOutput>>MyDestinationStream>>Ack>>copyBoth.

I think I understand the mechanism now and pg-copy-stream surely can abstract the copyBoth mechanism.

Please tell me what you think of the dedicated copyBoth duplex stream. It may be not too hard to implement even if I have understood that the monkey-patched copyTo + calls to the *copyFrom* family of functions in pg goes a long way.

on the copyBoth-Writable side (copyIn/copyFrom), ending the stream would send copyDone. and passing a chunk would forward it as copyData just as sendCopyFromChunk.

what do you think ?

I did some tests and there is still something I don't understand.
In copy-to.js, the current implementation "extracts" the copyData payloads from the copyOut channel and pushes it nearly chunk for chunk.

I did a test with the test_decoding plugin. When I insert a new tuple, postgres sends 1 network chunk containing 3 copyData, each containing one XLogData (BEGIN, INSERT, COMMIT).
So copy-to.js pushes 1 chunk containing XLogData1 | XLogData2 | XLogData3

Looking at the documentation,

XLogData (B)
Byte1('w')
Identifies the message as WAL data.

Int64
The starting point of the WAL data in this message.

Int64
The current end of WAL on the server.

Int64
The server's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.

Byten
A section of the WAL data stream.

A single WAL record is never split across two XLogData messages. When a WAL record crosses a WAL page boundary, and is therefore already split using continuation records, it can be split at the page boundary. In other words, the first main WAL record and its continuation records can be sent in different XLogData messages.

and there is no way to know the length of the Byten so XLogData1 | XLogData2 | XLogData3 seems hard to parse after they have been concatenated and unwrapped from the copyData.

What did you do to circumvent this problem ? the pgoutput content may self-describe its length but how to you parse the XLogData in a plugin agnostic way in copyDataHandler when using copy-to.js ? can you tell me which version of pg-copy-stream you are using ?

thanks for your help

No, you don't need to know the content of XLogData, actually I would say it's up to the next pipeline to interpret them, because you don't know what plugin people are using on their server and they can write whatever content they want.

The only thing that matters is when downstream are done with the chunk and you to acknowledge your flush position to the server. For that, please take a look at how pg_recvlogical.c do it. From the comment of https://github.com/postgres/postgres/blob/af4ea507c3d9217579a8d75fc17f4796a9bab0bb/src/bin/pg_basebackup/pg_recvlogical.c#L471
and
https://github.com/postgres/postgres/blob/af4ea507c3d9217579a8d75fc17f4796a9bab0bb/src/bin/pg_basebackup/pg_recvlogical.c#L504

So in essence, you will need to handle k message and extract the current WAL from server and compare the dataStart in the w message. If pg_recvlogical can do it, no reason we can't, so that's the best reference point.

Just checked pgwire. Yes, you are correct, they are using onData also, which doesn't do backpressure correctly, it's odd nobody noticing that. Perhaps, not that many people use it for logical replication. Well, for node pg library, I don't think anyone have the exposure you have with node-postgres, think that's why I stick to your library in the first place, because yours are more battle tested. 👍

As for implementing a copy-both using Duplex stream, well, what can I say, I am all in for that, and that's better than my own half-baked solution, and I imagine it would have been beneficial to do things other than logical replication in the long run. So my 👍 to it.

As for pgoutput decoding on the client side, please check out my gist, this is how my PgOutputParser stream works, another thing worth mentioning is that it's using your pg-types to decode column data, should be future proofed for any types postgresql might be supporting provided you update your pg-types, and of course custom types are supported using the same interface as pg-types. I think that's better than the one in pgwire.

The thing is that you need to separate the sequential XLogDatas.
If you receive XLogData1 | XLogData2 | XLogData3, you need to know how long XlogData1 is to know where XLogData2 begins.

In the scenario where you have copyData(XLogData1)|copyData(XLogData2)|copyData(XLogData3) you can use the end of the copyData to know where XLogData1 ends but if you remove the copyData wrapper, the subprotocol needs to be self-aware of its length, which XLogData does not seem to be (except if there is something i do not understand).

in pg-copy-stream 1.2.0, we where pushing XLogData1, XLogData2 and XLogData3 independently (each copyData implied a push) which makes it ok to use the end of the copyData to as the end of the XLogData but this has changed since 2.0.0 so I need to understand what is going on in your implementation : a bug or something I still don't understand. If my reasoning is correct, the copy-both.js implementation would need to push each copyData independently to have a correct behavior but that seems very weird to me regarding my past understanding of copyData as a simple carry channel without any semantic.

Browsing through the code of pg_recvlogical.c I see that they work at the copyData message level via PQgetCopyData so they buffer a whole copyData message in memory as a buffer and then analyze it fully via https://github.com/postgres/postgres/blob/af4ea507c3d9217579a8d75fc17f4796a9bab0bb/src/bin/pg_basebackup/pg_recvlogical.c#L542, taking all the bytes after the header up until the end of the buffer, so the end of the XLogData message seem to be only determined by the end of the copyData message...

This is very weird to me because I really thought that copyData was just a wrapper channel without any semantic regarding what flows inside.

Maybe in copyBoth we will be forced to push copyData messages individually to allow for that but that is clearly not what copyTo is doing now so one question remains : why does it seem to work for you ? are you missing messages because you consider that after "w lsn lsn time" all the rest is pgoutput related ? does your pgoutput implementation discard the additional bytes ?

Reading your gist, I think I understand :
you reparse the copyData messages in https://gist.github.com/wwindcloud/d01fadf27eb5ffa27ead127915b20dff#file-pg-output-parser-js-L157 so if I understand correcly your copyDataHandler only looks at the first byte and passes through the whole chunk if it is 'w'. is that correct ?
and then since pgoutput format is probably self describing its length everything falls back into place.

it would be interesting to see what happens with a very long bytea > 2^16 I'll test that later.

Yes, I parse the whole chunk, I don't think there's a way to know in advance how many XLogData are there until you really interpret the walData part.

As for case where it might cross over the wal boundary limit, i.e. > 2^ 16, I never thought of that, maybe will try on a large column of text and see how it behaves.

I looked at what happens with a simple insert :

const fieldSize = 64 * 1024
`INSERT INTO plug (col1) values (repeat('-', ${fieldSize}))`

I am using the 'test_decoding' plugin

`
network 31 bytes composed of:

  • copyBothResponse : W Int32(7) Int8(0) Int16(0)
  • copyData with Primary keepalive message : d Int32(22) k Int64 Int64 Byte1(0)
    network 65536 bytes composed of
  • copyData with XLogData : d Int32(40) w Int64 Int64 Int64 Byte11(BEGIN 33128)
  • copyData with XLogData : d Int32(65605) w Int64 Int64 Int64 Byte65465(table public.plug: INSERT: col1[text]:'-----------(...)) containing 65426 "-"
    network 153 bytes composed of
  • 110 "-" followed by "'"
  • copyData with XLogData : d Int32(41) w Int64 Int64 Int64 Byte12(COMMIT 33128)

so a network chunk beginning with "-" is received.

The current implementation of pg-copy-streams will push

a 18 byte chunk composed of

  • k Int64 Int64 Byte1(0)
    a 65526 byte chunk composed of
  • w Int64 Int64 Int64 Byte11(BEGIN 33128)
  • w Int64 Int64 Int64 Byte65465(table public.plug: INSERT: col1[text]:'-----------(...)) containing 65426 "-"
    a 148 byte chunk composed of
  • 110 "-" followed by "'"
  • w Int64 Int64 Int64 Byte12(COMMIT 33128)

I did not try but I think that the 3rd chunk breaks the parsing in the gist.

the usual way to fix this is to buffer in memory the whole copyData messages so that each chunk passed downstream corresponds to a full copyData message (and only one).

But this can lead to heavy memory consumption with very large fields which is why in recent pg-copy-streams version we decided to stop this buffering. I could force it in copy-both.js and that is what pgrecvlogical does after all but that seems like a regression performance wise. it seems to only keep growing with the text fieldSize.

I don't know know either if postgres can put 'k' message and 'w' messages in the same network chunk. It seems to me that in your implementation, as soon as a chunk begins with 'w', you consider that the rest of the chunk is a sequence of 'w' and the when a chunk contains a sequence of w messages, your pgoutput implementation manages to work through all of them for as long as the end of the chunk corresponds to the end of a XLogData message.

Did you experiment with long text fields ?

Yes, I have also tried using a large text field, got same result as yours. My PgOutputParser breaks at some point. Well, I guess I have been using small data fields and was lucky didn't see these kind of things happening until now.

For the time being, I think the correct way to do it is to teach PgOutputParser be mindful of these things, The feedback will also be need to be done inside PgOutputParser as well because only it knows how to count the length of XLogData.

For a reasonable size text field I think it's ok, we can increase the memory or set an option in the parser to drop the field if it's larger than a certain threshold. The parser will need a rewrite if it needs to apply for general use for very large fields and in the same time not want to explode memory, maybe employing similar techniques in processing XML, by using SAX rather than DOM.

Anyway, it's still beneficial if copy-both.js exists as Duplex stream, the copy-both.js should be very simple that it don't even need to handle both k and w messages, it should just mindlessly pass things downstream. My PgOutputParser should just take care of the rest, and I can use write() from copy-both.js to send feedback back when it's appropriate. Think that should be easy for you. What you think? Thanks.

yes copy-both.js will mindlessly pass things downstream and imho should not be aware of what flows inside. This has worked well for copy-from and copy-to and I would find it weird if copy-both needed to know what flows inside it.

the problem I have is pgoutput plugin seems self-aware of its own field length so it seems indeed possible (and you, who know pgoutput a lot more than me, seem to confirm it) to have a PgOutputParser that recovers the message boundaries whatever the chunking that may happen (k, w, w, w, k, ...)

but other plugins do not seem to have that. 'test_decoding' for sure doesn't have that capability, 'wal2json' does not easily have that capability even though it seems possible to know when a json ends. I don't know for other plugins.

can you confirm that in the pgoutput spec, all fields length are announced so we can know where they end before hand ?

can you enlighten me on the different logical decoding plugins ? did they all become useless after pgoutput was added to postgres master in 10.0 ?

I prefer the default option of copy-both.js to not have a potential memory explosion that we have seen so the current behavior is ok and it seems ok for pgoutput if k messages are also handled by PgOutputParser.

but I think it is necessary to add an option that people could use to force the buffering of full copyData messages, just like pg_recvlogical does. It could be rowMode: true since in copyIn scenarios during COPY TO, each copyData contains a row. Do you think another name for the option would be better ?

Ok, I have think through the workflow again, just doesn't make sense somehow because pg_recvlogical should be able to handle any plugin the server backend throw at it and looking at the source file again and fe-protocol3.c on PQgetCopyData confirm my suspicion, it always assume the first byte to be either k or w, others will just bail out.

So, digging through copy-to.js again and noticed something fishy about this._copyDataChunks, it seems every push you execute on it is just bytes concatenations, the code just greedily takes everything as much as possible and pass the whole chunk downstream without respecting the length in the copyData message, well it kind of respect it but only guarantee it to be at least the size but not exactly that size, resulting in sometimes the next XLogData get chopped off. I have again patch your code to push chunk exactly the size the copyData message specified, seems ok now my PgOutputParser works.

Please refer to my patch of copy-to.js to see how I fix the problem.

No this is exactly what I was talking about :

pg_recvlogical, via PQgetCopyData, buffers in memory whole copyData messages and then analyzes them.

the way copy-to.js works is done on purpose to specifically avoid buffering whole copyData messages in memory before pushing them.

In your patch, you wait until a whole copyData message has arrived before pushing it so in the case of a very big field, this uses a lot of memory. copy-to.js stopped doing this buffering and extracts parts of copyData messages as they arrive from the network.

So copy-to simply forwards network chunks after removing the COPY meta data. This works well for COPY TO and COPY FROM.

In the current case, this approach sort of breaks for plugins that rely on the end of the copyData message to declare their xlogdata has ended which seems bad to me because if forces client to buffer whole copy data messages in memory (just as pg_recvlogical does in PQgetCopyData)

for now I prefer keeping the default approach to be the current approach, but I will add a rowMode: true option (or another name if you can come up with something better) that will force copy-both.js to push whole copyRows (it is a different patch, but should have the same effect as yours)

Oh I see what you meant now. That case, I think we ran out of options, then your rowMode: true would be a nice compromise, and should be sufficient for most general usage. For very large row data, I think custom handling is needed anyway whether it's in the PgOutputParser or even destination downstream, so I am ok with that.

Looking forward to your copy-both.js release. Thanks.

I commited a first version of copy-both.js can you test it by cloning it direcly ?

I am interested if you can find a nice way to use the fact that the stream is duplex + if you find corner cases that could be added to the tests. At this stage I only added a very light handler for the messages.

as discussed, you need to add rowMode: true to receive the 1 chunk per copyData message. And indeed, each copyData message contains 1 logical decoding message as per https://www.postgresql.org/docs/10/protocol-replication.html

WAL data is sent as a series of CopyData messages. (This allows other information to be intermixed; in particular the server can send an ErrorResponse message if it encounters a failure after beginning to stream.) **The payload of each CopyData message from server to the client contains a message** of one of the following formats:

Thanks for supporting this, I have published my pg-subscription-stream using your copy-both. Please check out my repository

great ! I quickly browsed through the code.
I did not see the rowMode: true option in https://github.com/wwindcloud/pg-subscription-stream/blob/master/pg-subscription-stream.js#L21 was that on purpose ?

Do you confirm your tests went ok on your use case ? If yes I will publish a new version of the library to npm.

After thinking about it, in the published version I will change the name of the rowMode option to:

  • respectDataFrame: true
  • copyDataFrameAlignment: true
  • or alignOnCopyDataFrame: true

which version of the name is more meaningful to you ?

Yes, tested on my side ok, I just forgot to copy the rowMode option after copy-and-paste around, will update in the next version after you publish yours.

As for naming, doesn't really matter so long the work get done properly. If I were to choose, think alignOnCopyDataFrame serve the purpose.

ok. so I changed the option to alignOnCopyDataFrame: true
published version 5.1.0 to npm

can you test with a long text field (>64KB) that your parser breaks without the option and works with the option ?

thanks

Ok, just updated my repo and added an alignOnCopyDataFrame, it broke as expected without it, and now it works on large field no problem. So I guess this issue can be closed.

Thanks for your help.

Ok I'm glad to know that it works now. In the next months I'll try to dig a little more into pg-subscription-stream and open a discussion there if needed.

Happy Logical Decoding !