/gobroker

golang wrapper for all (to-be) kinds of message brokers

Primary LanguageGoMIT LicenseMIT

Build Status GitHub release GitHub license

gobroker

wrapper for all (to-be) kinds of message brokers (go v1.16.x)

Supported message brokers & patterns

PubSub

  • RabbitMQ (fanout)
  • Google Cloud Pub/Sub
  • NSQ

Intentions & Features

  • Generic terms & functions to use message brokers
  • Auto reconnection
  • Limit & requeue messages*
  • Concurrent subscribers
  • Support for mockgen unit-testing

Install

# go get
$ go get github.com/febytanzil/gobroker

Usage

Complete examples are provided in examples folder/ package

RabbitMQ

// initialize publisher RabbitMQ
p := pubsub.NewPublisher(gobroker.RabbitMQ, pubsub.RabbitMQAMQP("amqp://guest:guest@localhost:5672/", "vhost"))

p.Publish("test.fanout", "msg"+t.String())
// register RabbitMQ subscriber(s) & run it
s := pubsub.NewSubscriber(gobroker.RabbitMQ, []*pubsub.SubHandler{
    {
        Name:        "test.consumer",
        Topic:       "test.fanout",
        Handler:     testRMQ,
        MaxRequeue:  10,
        Concurrent:  2,
        MaxInFlight: 3,
    },
}, pubsub.RabbitMQAMQP("amqp://guest:guest@localhost:5672/", "vhost"))

s.Start()

Google

// initialize publisher Google
p := pubsub.NewPublisher(gobroker.Google, pubsub.GoogleJSONFile("gcp-project-id", "cluster-name", "/path/to/google/application/credentials/cred.json"))

p.Publish("test", "msg"+t.String())
// register Google subscriber(s) & run it
s := pubsub.NewSubscriber(gobroker.Google, []*pubsub.SubHandler{
        {
            Name:        "consumer-test",
            Topic:       "test-topic",
            Handler:     testGoogle,
            MaxRequeue:  10,
            Concurrent:  3,
            Timeout:     10 * time.Minute,
            MaxInFlight: 1,
        },
    },
    pubsub.GoogleJSONFile("gcp-project-id", "cluster-name", "/path/to/google/application/credentials/cred.json"))
		
s.Start()

Creating subcriber/ consumer

// subcriber function format
// return nil will ack the message as success
// return error will requeue based on config

func testRMQ(msg *gobroker.Message) error {
    var encoded string
    
    gobroker.StdJSONCodec.Decode(msg.Body, &encoded)
    log.Println("consume rabbitmq:", encoded)
    
    return nil
}
func testGoogle(msg *gobroker.Message) error {
    var encoded string
    
    gobroker.StdJSONCodec.Decode(msg.Body, &encoded)
    log.Println("consume google pubsub", encoded)
    
    return errors.New("requeue msg body: " + encoded)
}

Notes

Due to requeue limiter, the behavior both in RabbitMQ & Google Pub/Sub is changed to republish to the topic with additional header that contains counter to make this possible

Contributing

Please use a fork to create a pull request

Contributors