/outbox

Transactional outbox based on polling publisher for PostgreSQL.

Primary LanguageGoApache License 2.0Apache-2.0

Golang simple transactional outbox

Transactional outbox based on polling publisher for PostgreSQL.

Features:

  • Persist messages
  • Publish message batch
  • Publish message in worker pool
  • Manage delay between batch publishing
  • Create custom publisher
  • Create custom repository
  • Use custom outbox table
  • Publish in partitions

Drivers:

  • pgx
  • gorm

Basic initialization with pgx Repository

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/vsvp21/outbox/v5"
)

type Publisher struct{}
func (p Publisher) Publish(exchange, topic string, message outbox.Message) error {
	payload, err := json.Marshal(message.Payload)
	if err != nil {
		return err
	}

	fmt.Printf("published message to topic: %s, payload: %s", topic, string(payload))

	return nil
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	c, err := pgxpool.New(ctx, "postgres://root:root@127.0.0.1:5432/db_name")
	if err != nil {
		log.Fatal(err)
	}

	r := outbox.NewRepository(outbox.NewPGXAdapter(c))

	relay := outbox.NewRelay(r, Publisher{}, 1_000, time.Millisecond)
	if err = relay.Run(ctx, outbox.BatchSize(100)); err != nil {
		log.Fatal(err)
	}
}

Basic initialization with gorm Repository

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"gorm.io/driver/postgres"
	"gorm.io/gorm"

	"github.com/vsvp21/outbox/v5"
)

type Publisher struct{}

func (p Publisher) Publish(exchange, topic string, message outbox.Message) error {
	payload, err := json.Marshal(message.Payload)
	if err != nil {
		return err
	}

	fmt.Printf("published message to topic: %s, payload: %s", topic, string(payload))

	return nil
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	db, err := gorm.Open(postgres.New(postgres.Config{
		DSN: "host=127.0.0.1 user=db_user password=secretsecret dbname=test_db port=5432 sslmode=disable",
	}), &gorm.Config{})
	if err != nil {
		log.Fatal(err)
	}

	r := outbox.NewRepository(outbox.NewGORMAdapter(db))

	relay := outbox.NewRelay(r, Publisher{}, 1_000, time.Millisecond)
	if err = relay.Run(ctx, outbox.BatchSize(100)); err != nil {
		log.Fatal(err)
	}
}

Custom outbox table:

package main

import "github.com/vsvp21/outbox/v5"

func main() {
	// Your code ...
	outbox.TableName = "custom"
	// Your code ...
}

Pgx Persister

package main

import (
	"context"
	"log"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/vsvp21/outbox/v5"
)

func main() {
	db, err := pgxpool.New(context.TODO(), "postgres://root:root@127.0.0.1:5432/db_name")
	if err != nil {
		log.Fatal(err)
	}

	p := outbox.NewPgxPersister(db)
	p.PersistInTx(context.TODO(), func(tx pgx.Tx) ([]outbox.Message, error) {
		// SQL Queries
		return []outbox.Message{}, nil
	})
}

Gorm Persister

package main

import (
	"log"

	"github.com/vsvp21/outbox/v5"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

func main() {
	c := postgres.Config{
		DSN: "host=127.0.0.1 user=db_user password=secretsecret dbname=test_db port=5432 sslmode=disable",
	}

	db, err := gorm.Open(postgres.New(c))
	if err != nil {
		log.Fatal(err)
	}

	p := outbox.NewGormPersister(db)
	p.PersistInTx(func(tx *gorm.DB) ([]outbox.Message, error) {
		// SQL Queries
		return []outbox.Message{}, nil
	})
}