How to resume replication?
zhanghaiyang9999 opened this issue · 9 comments
If I created a publication and replication slot, added and updated some table records.
later I exited my program but the table records still be updated or added.
How can I resume the replication from last time when my program exited?
For example, I replicated 5 records while my program is running, after my program exited, added another 5 new records into postgresql tables, How can I continue to replicate the new table records from number 6 to 10 if my program run again?
call pglogrepl.SendStandbyStatusUpdate method or other methods?
@jackc Thanks!
Firstly, your program should send status update message to Postgres, like this:
err = pglogrepl.SendStandbyStatusUpdate(ctx, c.conn,
pglogrepl.StandbyStatusUpdate{WALFlushPosition: c.lastWriteWALPos})
The lastWriteWALPos
can be fetched from CommitMessage.CommitLSN
:
// CommitMessage is a commit message.
type CommitMessage struct {
baseMessage
// Flags currently unused (must be 0).
Flags uint8
// CommitLSN is the LSN of the commit.
CommitLSN LSN
// TransactionEndLSN is the end LSN of the transaction.
TransactionEndLSN LSN
// CommitTime is the commit timestamp of the transaction
CommitTime time.Time
}
Then, if your program records the last CommitLSN
sent to the postgres, it can start replication with this as the restartLSN
:
err = pglogrepl.StartReplication(ctx, c.conn,
c.slotName, c.restartLSN,
pglogrepl.StartReplicationOptions{
Mode: pglogrepl.LogicalReplication,
PluginArgs: []string{
"proto_version '1'",
fmt.Sprintf("publication_names '%s'", c.pubName),
},
},
)
Firstly, your program should send status update message to Postgres, like this:
err = pglogrepl.SendStandbyStatusUpdate(ctx, c.conn, pglogrepl.StandbyStatusUpdate{WALFlushPosition: c.lastWriteWALPos})
The
lastWriteWALPos
can be fetched fromCommitMessage.CommitLSN
:// CommitMessage is a commit message. type CommitMessage struct { baseMessage // Flags currently unused (must be 0). Flags uint8 // CommitLSN is the LSN of the commit. CommitLSN LSN // TransactionEndLSN is the end LSN of the transaction. TransactionEndLSN LSN // CommitTime is the commit timestamp of the transaction CommitTime time.Time }
Then, if your program records the last
CommitLSN
sent to the postgres, it can start replication with this as therestartLSN
:err = pglogrepl.StartReplication(ctx, c.conn, c.slotName, c.restartLSN, pglogrepl.StartReplicationOptions{ Mode: pglogrepl.LogicalReplication, PluginArgs: []string{ "proto_version '1'", fmt.Sprintf("publication_names '%s'", c.pubName), }, }, )
Thanks @diabloneo !I will try this! thanks again!
Firstly, your program should send status update message to Postgres, like this:
err = pglogrepl.SendStandbyStatusUpdate(ctx, c.conn, pglogrepl.StandbyStatusUpdate{WALFlushPosition: c.lastWriteWALPos})
The
lastWriteWALPos
can be fetched fromCommitMessage.CommitLSN
:// CommitMessage is a commit message. type CommitMessage struct { baseMessage // Flags currently unused (must be 0). Flags uint8 // CommitLSN is the LSN of the commit. CommitLSN LSN // TransactionEndLSN is the end LSN of the transaction. TransactionEndLSN LSN // CommitTime is the commit timestamp of the transaction CommitTime time.Time }
Then, if your program records the last
CommitLSN
sent to the postgres, it can start replication with this as therestartLSN
:err = pglogrepl.StartReplication(ctx, c.conn, c.slotName, c.restartLSN, pglogrepl.StartReplicationOptions{ Mode: pglogrepl.LogicalReplication, PluginArgs: []string{ "proto_version '1'", fmt.Sprintf("publication_names '%s'", c.pubName), }, }, )
hi @diabloneo , I want to use clientXLogPos as the restartLSN,
clientXLogPos = xld.WALStart + pglogrepl.LSN(len(xld.WALData))
but the clientXLogPos's value will become from bigger to smaller and then from smaller to bigger, like this:
clientXLogPos: 1/337E38ED
clientXLogPos: 0/4C
clientXLogPos: 0/181
clientXLogPos: 1/337EBA02
Can clientXLogPos's be the restartLSN for the resume replication case? thanks!
CommitMessage.CommitLSN
Hi @diabloneo
If use the CommitMessage.CommitLSN as the restart point of replication ,then there is an issue here.
For example, I updated some records(say 10000 records) in one commit(for example, the sql: update foo set a=123 where id > 0 and id <=10000), How can I resume the replication form the middle record such as id=5000?
why using CommitMessage.CommitLSN
as lastWriteWALPos
rather than xld.WALStart + pglogrepl.LSN(len(xld.WALData))
?
what is the different between these tow value?
why using
CommitMessage.CommitLSN
aslastWriteWALPos
rather thanxld.WALStart + pglogrepl.LSN(len(xld.WALData))
? what is the different between these tow value?
well, it's actually wrong. I had the same question and issues with it. You should commit pglogrepl.PrimaryKeepaliveMessage.ServerWALEnd
which is not the end of wal file, but current pointer of logical decoder.
More (my investigation): https://stackoverflow.com/questions/71016200/proper-standby-status-update-in-streaming-replication-protocol
If you don't do so, you will not keep up with Postgres, when lot's of updates would happen to different database on the same instance. Resulting in:
- you used all space on DB server and you-re offline.
- or, slot is
lost
and unusable.
If you need to commit more often that PrimaryKeepaliveMessage
, use pglogrepl.CommitMessage.CommitLSN
Just to add what I did recently, in hope this may help. I don't keep track of transaction boundaries as probably should as the target system is for OLAP and does not care about transaction boundaries. I keep WALStart
in each record as PgLSN
to track the position the replication is up to, and once a record is written to the target system, its PgLSN+1
(as curLSN
below) is used in StandbyStatusUpdate()
to update slot's confirmed_flush_lsn
on the server.
When you resume replication from where you left last time, you should find the Max(PgLSN)+1
from the target system, and use it as curLSN
to filter out what you receive from the slot (use condition WALStart < curLSN
) as PG seems to search for a transaction boundary when you start_replication
and it repeats records for the whole transaction, based confirmed_flush_lsn
of the slot, if it's not on a commit. If you don't keep track of the LSNs on the target system (or in a third place), you need to prepare for duplicates when starting replication.
@ubombi do you have a minimal example of using it?
After some investigation I found a way of resuming replication.
The docs make it clear, use confirmed_flush_lsn
when using the START_REPLICATION SLOT
command. confirmed_flush_lsn
is The address (LSN) up to which the logical slot's consumer has confirmed receiving data. Data older than this is not available anymore. NULL for physical slots.
(link to docs) So something like this would give you the LSN that the server has confirmed:
SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1
The confirmed LSN is committed using the SendStandbyStatusUpdate
function, thus the xld.WALStart + pglogrepl.LSN(len(xld.WALData))
is ok to use.
Furthermore, if we read the postgres code there says that if you specify 0 as LSN it uses the confirmed_flush_lsn
by default. In summary, resuming the replication is as easy as using the following function:
err = pglogrepl.StartReplication(context.Background(), conn, slotName, pglogrepl.LSN(0), pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
@soasada Thanks for the insight, but I want to add that depending on your use case 0/0
is not safe to use in all cases. The issue is that if your system writes an update from the primary and then the process dies before sending the standby status message, postgres will have an incorrect view into your system's state. From my experimentation, it seems that it's also possible for Postgres to receive a standby message but not process it, e.g. if the standby disconnects soon after the standby message is sent.
So for these reasons, I recommend not relying on the 0/0
behavior, but storing the last flushed WAL position locally and always providing it with the START_REPLICATION
command.
I wrote a deep dive on this issue and others I encountered while integrating with this library, hopefully it helps somebody:
https://www.dolthub.com/blog/2024-03-08-postgres-logical-replication/