A simple pub/sub system that uses reliable task queues for delivery. It uses relyq (simple Redis-backed reliable task queues) to establish reliable messaging. It also provides a pub/sub interface for reliable messaging by adding a discovery service backed by any database or service using a modular system.
There are a few redis clients for Go but this package uses redigo
Every messaging system has different tradeoffs and this one is no different. Redis lists are not the fastest method of transport, so there will be latency. Also, this system is built to not drop messages on the floor, but the tradeoff is increased latency and possibility of overloading redis itself. Intentionally, it is possible to keep receiving messages in your queue when a subscriber goes offline without unsubscribing. Please be aware of these tradeoffs when choosing a messaging system.
For a less reliable alternatives (but faster and safer for non-long-term use-cases), see nats, for the all-encompasing messaging system: rabbitmq, and for the do-it-yourselfers: zeromq.
go get github.com/Rafflecopter/golang-messageq/messageq
import (
"github.com/garyburd/redigo/redis"
"github.com/Rafflecopter/golang-messageq/messageq"
"time"
)
func CreateMessageQ(pool *redis.Pool) *messageq.MessageQueue {
cfg := &messageq.Config{
RelyQConfig: &messageq.RelyQConfig{
Prefix: "my-relyq", // Required
Delimiter: ":", // Defaults to :
IdField: "id", // ID field for tasks
UseDoneQueue: false, // Whether to keep list of "done" tasks (default false)
KeepDoneTasks: false, // Whether to keep the backend storage of "done" tasks (default false)
},
// The cache decay time for listing a channels subscribers
SubscriberListDecay: 5 * time.Minute,
}
// This must be the same on all nodes of a pub/sub network!
discoveryPrefix := "my-discovery-prefix"
return messageq.NewRedis(pool, cfg, discoveryPrefix)
}
Create your data types:
type MyMessage struct {
messageq.StructuredMessage
OtherFields string
}
q := CreateMessageQ(redisPool)
var mymsg *MyMessage
if messageChannel, err := q.Subscribe("some-channel", mymsg); err != nil {
panic(err)
} else {
go func() {
for message := range messageChannel {
mymessage := message.(*MyMessage)
// Do something with your messages
}
}()
}
if err := q.Publish("another-channel", messageq.Message{"A":"Message"}); err != nil {
panic(err)
}
// Eventually
if err := q.Close(); err != nil {
panic(err)
}
go test
The messageq system can use any of the following backends, which are subclasses of the master type, so each represents a fully functional messageq system type. It is very important that all message queues on the network share the same discovery backend.
Right now this is the only backend implemented. The Redis backend is the primary suggested one, because of its proximity to the queues. It is very important that all message queues on the network share the same discoveryPrefix
. There's an easy creation shortcut.
mq := messageq.NewRedis(pool, cfg, discoveryPrefix)
See LICENSE file.