
Go AWSMQ is a library to connect the AWSMQ in an effective way. This is strongly influenced from the article https://www.ribice.ba/golang-rabbitmq-client/

Primary LanguageGo


GO-AWS-MQ is a name given to the rabbitMQ client. It is a service to connect any RabbitMQ or AWSMQ services. It will try to reconnect the server automatically in case of any client failure.

It is worth noting that routing-based keys will not be applicable for the default exchange. It is always good to have your private "exchange" and routing-based message enqueuing.

Sometimes, there could be a scenario related to the negative acknowledgment of the messages. It may be required to process it later. So it is a good idea to keep the dead-letter exchange and the queue.

Following are the examples which demonstrate the sending and processing of events.

Stream Example

package main

import (


func main() {
	logger := zerolog.New(os.Stderr).With().Timestamp().Logger()
	//(StreamQueue, PushQueue, addr string, l zerolog.Logger, done chan os.Signal
	sigs := make(chan os.Signal, 1)

	client := awsmq.New(
		"AWSMQ-Test-Exchange",                //ExchangeName
		"AWSMQ-Test-Exchange-DeadLetter",     //DeadLetterExchangeName
		"direct",                             //TypeofExchange
		"AWSMQ-Test-Queue-DeadLetter",        //DeadLetterQueueName
		"AWSMQ",                              //Routingkey
		"",                                   //PushQueue
		"AWSMQ-Test-Queue-Input",             //StreamQueue
		"amqp://user:bitnami@localhost:5672", //AMQP URL
		logger,                               //Logger
		sigs,                                 //Signal

	for {
		bctx := context.Background()
		ctx, _ := context.WithCancel(bctx)
		Stream(ctx, client)


// Stream should be handled in a different way if we are making this as a lib.
func Stream(cancelCtx context.Context, c *awsmq.Client) error {
	for {
		if c.IsConnected {
		time.Sleep(1 * time.Second)
	err := c.Channel.Qos(1, 0, false)
	if err != nil {
		return err

	var connectionDropped bool
	for i := 1; i <= c.Threads; i++ {
		msgs, err := c.Channel.Consume(
			awsmq.ConsumerName(i), // Consumer
			false,                 // Auto-Ack
			false,                 // Exclusive
			false,                 // No-local
			false,                 // No-Wait
			nil,                   // Args
		if err != nil {
			return err
		go func() {
			defer c.Wg.Done()
			for {
				select {
				case <-cancelCtx.Done():
				case msg, ok := <-msgs:
					if !ok {
						connectionDropped = true
					parseEvent(msg, c, cancelCtx)



	if connectionDropped {
		return errors.New("DIsConnected from AWSMQ, trying to reconnect")

	return nil

func parseEvent(msg amqp.Delivery, c *awsmq.Client, ctx context.Context) {
	//msg.Ack(true) To test positive acknowldegement
	msg.Nack(false, false) // To move data to deadletter queue.


Push Example:

package main

import (


func main() {
	logger := zerolog.New(os.Stderr).With().Timestamp().Logger()
	//(StreamQueue, PushQueue, addr string, l zerolog.Logger, done chan os.Signal
	sigs := make(chan os.Signal, 1)

	client := awsmq.New(
		"AWSMQ-Test-Exchange",                //ExchangeName
		"AWSMQ-Test-Exchange-DeadLetter",     //DeadLetterExchangeName
		"direct",                             //TypeofExchange
		"AWSMQ-Test-Queue-DeadLetter",        //DeadLetterQueueName
		"AWSMQ",                              //Routingkey
		"AWSMQ-Test-Queue-Input",             //PushQueue
		"",                                   //StreamQueue
		"amqp://user:bitnami@localhost:5672", //AMQP URL
		logger,                               //Logger
		sigs,                                 //Signal

	for i := 0; i < 200; i++ {
		client.Push([]byte(`This is a testing.`))
		time.Sleep(5 * time.Second)


Handling DeadLetter

package main

import (


func main() {
	logger := zerolog.New(os.Stderr).With().Timestamp().Logger()
	//(StreamQueue, PushQueue, addr string, l zerolog.Logger, done chan os.Signal
	sigs := make(chan os.Signal, 1)

	client := awsmq.New(
		"AWSMQ-Test-Exchange-DeadLetter",     //ExchangeName
		"",                                   //DeadLetterExchangeName
		"direct",                             //TypeofExchange
		"",                                   //DeadLetterQueueName
		"AWSMQ",                              //Routingkey
		"",                                   //PushQueue
		"AWSMQ-Test-Queue-DeadLetter",        //StreamQueue
		"amqp://user:bitnami@localhost:5672", //AMQP URL
		logger,                               //Logger
		sigs,                                 //Signal

	for {
		bctx := context.Background()
		ctx, _ := context.WithCancel(bctx)
		Stream(ctx, client)


// Stream should be handled in a different way if we are making this as a lib.
func Stream(cancelCtx context.Context, c *awsmq.Client) error {
	for {
		if c.IsConnected {
		time.Sleep(1 * time.Second)

	err := c.Channel.Qos(1, 0, false)
	if err != nil {
		return err

	var connectionDropped bool
	for i := 1; i <= c.Threads; i++ {
		msgs, err := c.Channel.Consume(
			awsmq.ConsumerName(i), // Consumer
			false,                 // Auto-Ack
			false,                 // Exclusive
			false,                 // No-local
			false,                 // No-Wait
			nil,                   // Args
		if err != nil {
			return err
		go func() {
			defer c.Wg.Done()
			for {
				select {
				case <-cancelCtx.Done():
				case msg, ok := <-msgs:
					if !ok {
						connectionDropped = true
					parseEvent(msg, c, cancelCtx)



	if connectionDropped {
		return errors.New("DIsConnected from AWSMQ, trying to reconnect")

	return nil

func parseEvent(msg amqp.Delivery, c *awsmq.Client, ctx context.Context) {
	msg.Ack(true) //To test positive acknowldegement
	//msg.Nack(false, false) // To move data to deadletter queue.
