/go-liftbridge

Go client for Liftbridge. https://github.com/liftbridge-io/liftbridge

Primary LanguageGoApache License 2.0Apache-2.0

go-liftbridge CircleCI GoDoc

Go client for Liftbridge, a system that provides lightweight, fault-tolerant message streams for NATS.

Liftbridge provides the following high-level features:

  • Log-based API for NATS
  • Replicated for fault-tolerance
  • Horizontally scalable
  • Wildcard subscription support
  • At-least-once delivery support and message replay
  • Message key-value support
  • Log compaction by key

Installation

$ go get github.com/liftbridge-io/go-liftbridge

Basic Usage

package main

import (
	"fmt"

	lift "github.com/liftbridge-io/go-liftbridge"
	"github.com/liftbridge-io/liftbridge-grpc/go"
	"golang.org/x/net/context"
)

func main() {
	// Create Liftbridge client.
	addrs := []string{"localhost:9292", "localhost:9293", "localhost:9294"}
	client, err := lift.Connect(addrs)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// Create a stream attached to the NATS subject "foo".
	var (
        	subject = "foo"
        	name    = "foo-stream"
	)
	if err := client.CreateStream(context.Background(), subject, name); err != nil {
		if err != lift.ErrStreamExists {
			panic(err)
		}
	}
	
	// Publish a message to "foo".
	if _, err := client.Publish(context.Background(), "foo", []byte("hello")); err != nil {
		panic(err)
	}

	// Subscribe to the stream starting from the beginning.
	ctx := context.Background()
	if err := client.Subscribe(ctx, name, func(msg *proto.Message, err error) {
		if err != nil {
			panic(err)
		}
		fmt.Println(msg.Offset, string(msg.Value))
	}, lift.StartAtEarliestReceived()); err != nil {
		panic(err)
	}

	<-ctx.Done()
}

Create Stream

Streams are a durable message log attached to a NATS subject. They record messages published to the subject for consumption.

Streams have a few key properties: a subject, which is the corresponding NATS subject, a name, which is a human-readable identifier for the stream, and a replication factor, which is the number of nodes the stream should be replicated to for redundancy. Optionally, there is a group which is the name of a load-balance group for the stream to join. When there are multiple streams in the same group, messages will be balanced among them.

// Create a stream attached to the NATS subject "foo.*" that is replicated to
// all the brokers in the cluster. ErrStreamExists is returned if a stream with
// the given name already exists for the subject.
client.CreateStream(context.Background(), "foo.*", "my-stream", lift.MaxReplication())

Subscription Start/Replay Options

Subscriptions are how Liftbridge streams are consumed. Clients can choose where to start consuming messages from in a stream. This is controlled using options passed to Subscribe.

// Subscribe starting with new messages only.
client.Subscribe(ctx, name, func(msg *proto.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
})

// Subscribe starting with the most recently published value.
client.Subscribe(ctx, name, func(msg *proto.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtLatestReceived())

// Subscribe starting with the oldest published value.
client.Subscribe(ctx, name, func(msg *proto.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtEarliestReceived())

// Subscribe starting at a specific offset.
client.Subscribe(ctx, name, func(msg *proto.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtOffset(42))

// Subscribe starting at a specific time.
client.Subscribe(ctx, name, func(msg *proto.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtTime(time.Now()))

// Subscribe starting at a specific amount of time in the past.
client.Subscribe(ctx, name, func(msg *proto.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtTimeDelta(time.Minute))

Publishing

A publish API is provided to make it easy to write messages to streams. This includes a number of options for decorating messages with metadata like a message key and headers as well as configuring acking behavior from the server.

Keys are used by Liftbridge's log compaction. When enabled, Liftbridge streams will retain only the last message for a given key.

// Publish a message with a key and header set.
client.Publish(context.Background(), "foo", []byte("hello"),
	lift.Key([]byte("key"),
	lift.Header("foo", []byte("bar")),
)

An AckPolicy tells the server when to send an ack. If a deadline is provided to Publish, it will block up to this amount of time waiting for the ack.

ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
client.Publish(ctx, "foo", []byte("hello"),
	lift.AckPolicyAll(), // Wait for all stream replicas to get the message
)

ctx, _ = context.WithTimeout(context.Background(), 2*time.Second)
client.Publish(ctx, "foo", []byte("hello"),
	lift.AckPolicyLeader(), // Wait for just the stream leader to get the message
)

client.Publish(context.Background(), "foo", []byte("hello"),
	lift.AckPolicyNone(), // Don't send an ack
)

Publishing Directly with NATS

Since Liftbridge is an extension of NATS, a NATS client can also be used to publish messages. This means existing NATS publishers do not need any changes for messages to be consumed in Liftbridge.

package main

import "github.com/nats-io/go-nats"

func main() {
	// Connect to NATS.
	nc, _ := nats.Connect(nats.DefaultURL)

	// Publish a message.
	nc.Publish("foo.bar", []byte("Hello, world!")) 
	nc.Flush()
}

As shown with the Publish API above, Liftbridge allows publishers to add metadata to messages, including a key, ack inbox, correlation ID, and ack policy. The message key can be used for stream compaction in Liftbridge. Acks are used to guarantee Liftbridge has recorded a message to ensure at-least-once delivery. The ack inbox determines a NATS subject to publish an acknowledgement to once Liftbridge has committed the message. The correlation id is used to correlate an ack back to the original message. The ack policy determines when Liftbridge acknowledges the message: when the stream leader has stored the message, when all replicas have stored it, or no ack at all.

This additional metadata is sent using a message envelope which is a protobuf. The Publish API handles this for you, but this client library also provides APIs to make it easy to create envelopes and deal with acks yourself using a NATS client directly.

var (
	ackInbox = "foo.acks"
	cid      = "some-random-id"
)

// Create a message envelope to publish.
msg := lift.NewMessage([]byte("Hello, world!"),
	lift.Key([]byte("foo")), // Key to set on the message
	lift.AckInbox(ackInbox), // Send ack to this NATS subject
	lift.AckPolicyAll(),     // Send ack once message is fully replicated
	lift.CorrelationID(cid), // Set the ID which will be sent on the ack
)

// Setup a NATS subscription for acks.
sub, _ := nc.SubscribeSync(ackInbox)

// Publish the message.
nc.Publish("foo.bar", msg)

// Wait for ack from Liftbridge.
resp, _ := sub.NextMsg(5*time.Second)
ack, _ := lift.UnmarshalAck(resp.Data)
if ack.CorrelationId == cid {
	fmt.Println("message acked!")
}

Partitioning

Liftbridge streams are partitioned to allow for increased parallelism. By default, a stream consists of a single partition, but the number of partitions can be configured when the stream is created.

Creating Partitioned Streams

// Create stream with three partitions.
client.CreateStream(context.Background(), "bar", "bar-stream", lift.Partitions(3))

Each partition maps to a NATS subject derived from the base stream subject. For example, the partitions for a stream with three partitions attached to the subject "bar" map to the NATS subjects "bar", "bar.1", and "bar.2", respectively.

Publishing to Stream Partitions

By default, clients will publish to the base partition, but this can be configured by providing a Partitioner.

// Publish to partition based on message key hash.
client.Publish(context.Background(), "bar", []byte("hello"),
	lift.Key([]byte("key")),
	lift.PartitionByKey(),
)

// Publish to partitions in a round-robin fashion.
client.Publish(context.Background(), "bar", []byte("hello"),
	lift.Key([]byte("key")),
	lift.PartitionByRoundRobin(),
)

// Publish to a specific partition.
client.Publish(context.Background(), "bar", []byte("hello"),
	lift.Key([]byte("key")),
	lift.ToPartition(1),
)

A custom Partitioner implementation can also be provided.

Subscribing to Stream Partitions

Like publishing, clients will subscribe to the base partition by default. However, a specific partition to consume from can be specified at subscribe time.

// Subscribe to a specific partition.
client.Subscribe(ctx, "bar-stream", func(msg *proto.Message, err error) {
	fmt.Println(msg.Offset, string(msg.Value))
}, lift.Partition(1))