RabbitMQ publisher/subscriber implementation

Build Status codecov

Installation

go get github.com/ProtocolONE/rabbitmq

Usage

package main

import (
	"fmt"
	"github.com/ProtocolONE/rabbitmq/internal/proto"
	"github.com/ProtocolONE/rabbitmq/pkg"
	"github.com/streadway/amqp"
	"log"
	"math/rand"
)

func main() {
	br1, err := rabbitmq.NewBroker("amqp://127.0.0.1:5672")

	if err != nil {
		log.Fatalln(err)
	}

	topic := "test"

	br1.Opts.ExchangeOpts.Opts = rabbitmq.Opts{rabbitmq.OptAutoDelete: true}
	br1.Opts.QueueOpts.Opts = rabbitmq.Opts{rabbitmq.OptAutoDelete: true}

	br2, err := rabbitmq.NewBroker("amqp://127.0.0.1:5672")

	if err != nil {
		log.Fatalln(err)
	}

	br2.Opts.QueueOpts.Args = amqp.Table{
		"x-dead-letter-exchange":    topic,
		"x-message-ttl":             int32(1 * 1000),
		"x-dead-letter-routing-key": "*",
	}
	br2.Opts.ExchangeOpts.Name = "test.timeout10"

	fn := func(msg *test.One, d amqp.Delivery) (err error) {
		rnd := rand.Intn(100)
		rtr := int32(0)

		if v, ok := d.Headers["x-retry-count"]; ok {
			rtr = v.(int32)
		}

		if rnd > 50 {
			if rtr > 10 {
				log.Printf("[x] Max retries count is ended. Delete message : %s\n", msg.Value)
			} else {
				_ = br2.Publish(d.RoutingKey, msg, amqp.Table{"x-retry-count": rtr + 1})
			}

			log.Printf("[x] Retry (retry number %d) failed message: %s\n", rtr, msg.Value)
			return
		}

		log.Printf("Message successfully processed (retry number %d): %s", rtr, msg.Value)
		return
	}

	err = br1.RegisterSubscriber(topic, fn)

	for i := 0; i < 10; i++ {
		one := &test.One{Value: fmt.Sprintf("%s_%d", topic, i)}
		err = br1.Publish(topic, one, nil)

		if err == nil {
			log.Printf("Message published: %s\n", one.Value)
		}
	}

	err = br1.Subscribe(nil)

	if err != nil {
		log.Fatalln(err)
	}
}