Use this library to make rpc calls over rabbit MQ using amqp protocol
go get -u github.com/raghuP9/amqp
go get -u github.com/raghuP9/amqp/cmd/amqpctl
$GOPATH/bin/amqpctl --help
$GOPATH/bin/amqpctl --server rabbitmq.example.com --port 5672 --username <username> --password <password> producer
$GOPATH/bin/amqpctl --server rabbitmq.example.com --port 5672 --username <username> --password <password> consumer
GoDoc Link
package main
import (
"github.com/raghuP9/amqp/blob/master/pkg/rpc/rmq"
)
func main() {
client, err := rmq.GetRMQClient(
true, // to connect securely i.e. using amqps or else set to false
"rabbitmq-username", // rabbitmq username
"rabbitmq-password", // rabbitmq password
"myrabbitmq.server.com", // rabbitmq server URL
"1234", // rabbitmq server port
"/" // vhost
)
}
err := client.ExchangeDeclare(
"exchange-name",
rmq.DefaultExchangeDeclareOpts(),
rmq.DefaultConnectOpts(),
)
err := client.ExchangeDelete(
"exchange-name", // Exchange name
true, // IfUnused: Remove exchange if no queue bound to this exchange
false, // NoWait: Do not wait for deletion confirmation from rabbitmq server
)
err := client.QueueDeclare(
"queue-name",
rmq.DefaultDeclareQueueOpts(),
rmq.DefaultConnectOpts(),
)
err := client.QueueBind(
"exchange-name",
"queue-name",
"routing-key",
rmq.DefaultQueueBindOpts(),
rmq.DefaultConnectOpts(),
)
err := client.QueueDelete(
"queue-name",
rmq.DefaultQueueDeleteOpts(),
rmq.DefaultConnectOpts(),
)
err := client.QueuePurge(
"queue-name",
false, // NoWait: do not wait for confirmation from rabbitmq server and return
rmq.DefaultConnectOpts(),
)
import "github.com/streadway/amqp"
func doSomething() {
err := client.Publish(
amqp.Publishing{
Body: []byte(c.String("message")),
DeliveryMode: amqp.Persistent,
ContentType: "plain/text",
Timestamp: time.Now(),
},
"exchange-name",
"routing-key",
rmq.DefaultPublishOpts(),
rmq.DefaultConnectOpts(),
)
}
import "github.com/streadway/amqp"
func handler(msg amqp.Delivery) (amqp.Publishing, error) {
...
}
func doSomething() {
err := client.Subscribe(
context.TODO(), // When passing context with cancel func, calling cancel() will return from Subscribe function
"queue-name", // Name of the queue
rmq.DefaultSubscribeOpts(), // Provide options such as correlation ID, listen indefinitely on the queue, reconnect if disconnected, publish response from handler function
rmq.DefaultChannelOpts(), // Set Qos e.g. do not pick another message from queue unless previous message is processed.
rmq.DefaultConnectOpts(),
handler, // handler function that takes received message, processes it and returns response message, can be anonymous fn.
)
}