/rabbitmq

RabbitMq Wrapper

Primary LanguageGo

rabbitmq Wrapper

RabbitMq Wrapper is the a client API for RabbitMQ.

  • A wrapper over amqp exchanges and queues.
  • In memory retries for consuming messages when an error occured
  • CorrelationId and MessageId structure
  • Exchange Types With Direct, Fanout, Topic, ConsistentHashing
  • Retry policy (immediately , interval)
  • Multiple consumers In a single process
  • Create goroutines and consume messages asynchronously
  • Disable consume messages asynchronously if you want
  • Retry to connect another node When RabbitMq Node is Down or Broken Connection
  • Add stack trace on the message header if the error occurred when the message is consumed
  • Some extra features while publishing message (will be added)

To connect to a RabbitMQ broker...

	var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost")

To connect to a RabbitMQ broker with retry policy

  • Consumer retries two times immediately if an error occured

     var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost",
                                               rabbit.RetryCount(2,time.Duration(0)))
    
  • Create goroutines and consume messages asynchronously using PrefetchCount Prefix. Create as number of PrefetchCount as goroutines .

    	var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost",
                    		rabbit.PrefetchCount(3))
    

To send a message

    // Added

To consume a message

    onConsumed := func(message rabbit.Message) error {
    
    		var consumeMessage PersonV1
    		var err= json.Unmarshal(message.Payload, &consumeMessage)
    		if err != nil {
    			return err
    		}
    		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
    		return nil
    	}
    

    rabbitClient.AddConsumer("In.Person").
    SubscriberExchange("RoutinKey.*",rabbit.Direct ,"Person").
    HandleConsumer(onConsumed)

To Consume multiple messages

	onConsumed := func(message rabbit.Message) error {

		var consumeMessage PersonV1
		var err= json.Unmarshal(message.Payload, &consumeMessage)
		if err != nil {
			return err
		}
		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
		return nil
	}

	onConsumed2 := func(message rabbit.Message) error {

		var consumeMessage PersonV4
		var err= json.Unmarshal(message.Payload, &consumeMessage)
		if err != nil {
			return err
		}
		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
		return nil
	}
	rabbitClient.AddConsumer("In.Person3").
            SubscriberExchange("",rabbit.Fanout ,"ExchangeNamePerson").
            HandleConsumer(onConsumed)
            
    rabbitClient.AddConsumer("In.Person").
             SubscriberExchange("Person.*",rabbit.Direct ,"PersonV1").
             HandleConsumer(onConsumed2)
             

	rabbitClient.RunConsumers()

To Consume multiple exchange

    rabbitClient.AddConsumer("In.Lines").
    		SubscriberExchange("1", rabbit.ConsistentHashing,"OrderLineAdded").
    		SubscriberExchange("1", rabbit.ConsistentHashing,OrderLineCancelled).
    		WithSingleGoroutine(true).
    		HandleConsumer(onConsumed2)