package main
import (
"context"
"fmt"
"log"
"time"
"github.com/jackc/pgx"
"github.com/kyleconroy/pgoutput"
)
func main() {
ctx := context.Background()
config := pgx.ConnConfig{Database: "opsdash", User: "replicant"}
conn, err := pgx.ReplicationConnect(config)
if err != nil {
log.Fatal(err)
}
// Create a slot if it doesn't already exist
// if err := conn.CreateReplicationSlot("sub2", "pgoutput"); err != nil {
// log.Fatalf("Failed to create replication slot: %v", err)
// }
set := pgoutput.NewRelationSet()
dump := func(relation uint32, row []pgoutput.Tuple) error {
values, err := set.Values(relation, row)
if err != nil {
return fmt.Errorf("error parsing values: %s", err)
}
for name, value := range values {
val := value.Get()
log.Printf("%s (%T): %#v", name, val, val)
}
return nil
}
handler := func(m pgoutput.Message) error {
switch v := m.(type) {
case pgoutput.Relation:
log.Printf("RELATION")
set.Add(v)
case pgoutput.Insert:
log.Printf("INSERT")
return dump(v.RelationID, v.Row)
case pgoutput.Update:
log.Printf("UPDATE")
return dump(v.RelationID, v.Row)
case pgoutput.Delete:
log.Printf("DELETE")
return dump(v.RelationID, v.Row)
}
return nil
}
replication := pgoutput.LogicalReplication{
Subscription: "sub2",
Publication: "pub2",
WaitTimeout: time.Second * 10,
StatusTimeout: time.Second * 10,
Handler: handler,
}
if err := replication.Start(ctx, conn); err != nil {
log.Fatal(err)
}
}