- Topology declaration API
- Consumer listener API
- Message handler API
- Consumers router
- Middleware support
- Common middlewares implementation
- Graceful shutdown
- Automatic reconnection
Carrot exposes a nice API for dealing with AMQP connections, such as declaring topologies (exchanges, queues, ...) and declaring consumers on one or more queues.
Check out the examples for more information.
Carrot uses three main components for its API:
- Topology declarator, to declare the AMQP topology from the application that uses it
- Message handlers, to define functions able to handle incoming messages from consumers
- Consumer listeners, to receive messages from one or more queues
Carrot allows to define a topology by exposing an expressive API backed by
topology.Declarer
interface.
The current supported topologies are:
- Queues, found in
topology/queue
package - Exchanges, found in
topology/exchange
package
Topology declaration is optional, and can be controlled with carrot.WithTopology
:
carrot.WithTopology(topology.All(
exchange.Declare("messages"),
queue.Declare("consumer.message.received",
queue.BindTo("messages", "message.published"),
),
queue.Declare("consumer.message.deleted",
queue.BindTo("messages", "message.deleted"),
),
)),
When specified, Carrot will open a dedicated AMQP channel to declare the topology, before listening to messages.
Carrot can also be used exclusively for topology declaration:
conn, err := amqp.Dial("amqp://guest:guest@rabbit:5672")
if err != nil {
panic(err)
}
// The carrot.Closer handle returned is useless if only topology declaration
// is used.
_, err := carrot.Run(conn,
// Declare your topology here
carrot.WithTopology(topology.All(
// topology.All is used to declare more than one topology
// in a single transaction.
exchange.Declare("messages"),
queue.Declare("consumer.message.received",
queue.BindTo("messages", "message.published"),
),
queue.Declare("consumer.message.deleted",
queue.BindTo("messages", "message.deleted"),
),
)),
)
Carrot defines an interface for handling incoming messages (amqp.Delivery
)
in handler.Handler
interface:
type Handler interface {
Handle(context.Context, amqp.Delivery) error
}
Message handlers are fallible, so they can return an error.
Error handling can be specified at Consumer Listeners level.
You can specify a message handler for all incoming messages by using carrot.WithHandler
:
carrot.WithHandler(handler.Func(func(context.Context, amqp.Delivery) error {
// Handle messages here!
return nil
}))
Carrot also exposes a Router
interface and implementation
to support:
- Multiple listeners with their own message handlers
- Middleware support
An example of how a Router
setup might look like:
// Router implements the handler.Handler interface.
router.New().Group(func(r router.Router) {
// This is how you set middlewares.
r.Use(LogMessages(logger))
r.Use(middleware.Timeout(50 * time.Millisecond))
r.Use(SimulateWork(100*time.Millisecond, logger))
// This is how you bind an handler function to a specific queue.
// In order for it to work, you must register these queues
// in the listener.
r.Bind("consumer.message.received", handler.Func(Acknowledger))
r.Bind("consumer.message.deleted", handler.Func(Acknowledger))
// You can also specify additional middlewares only for one queue:
r.With(AuthenticateUser).
Bind("consumer.message.created", handler.Func(Acknowledger))
})
As the name says, Listeners listens for incoming messages on a specific queue.
Carrot defines a listener.Listener
interface to
represent these components:
type Listener interface {
Listen(Connection, Channel, handler.Handler) (Closer, error)
}
so that the listener can:
- Start listening to incoming
amqp.Delivery
from aChannel
- Serving these messages using the provided
handler.Handler
- Hand out a
Closer
handler to close the listener/server goroutine and/or wait for its closing
An example of how to define Listeners:
// WithListener specifies the listener.Listener to start.
carrot.WithListener(listener.Sink(
// listener.Sink allows to listen to messages coming from one or more consumers,
// and pilots closing the child listeners.
consumer.Listen("consumer.message.deleted"),
listener.UseDedicatedChannel(
// By default, carrot uses a single amqp.Channel to establish
// consumer listeners. But we can tell carrot to use a dedicated
// amqp.Channel for certain consumers.
consumer.Listen("consumer.message.received"),
),
))
Let's put all the pieces together now!
conn, err := amqp.Dial("amqp://guest:guest@rabbit:5672")
if err != nil {
panic(err)
}
closer, err := carrot.Run(conn,
// First, declare your topology...
carrot.WithTopology(topology.All(
exchange.Declare("messages"),
queue.Declare("consumer.message.received",
queue.BindTo("messages", "message.published"),
),
queue.Declare("consumer.message.deleted",
queue.BindTo("messages", "message.deleted"),
),
)),
// Second, declare the consumers to receive messages from...
carrot.WithListener(listener.Sink(
consumer.Listen("consumer.message.deleted"),
listener.UseDedicatedChannel(
consumer.Listen("consumer.message.received"),
),
))
// Lastly, specify an handler function that will receive the messages
// coming from the specified consumers.
carrot.WithHandler(router.New().Group(func(r router.Router) {
r.Use(LogMessages(logger))
r.Use(middleware.Timeout(50 * time.Millisecond))
r.Use(SimulateWork(100*time.Millisecond, logger))
r.Bind("consumer.message.received", handler.Func(Acknowledger))
r.Bind("consumer.message.deleted", handler.Func(Acknowledger))
})),
)
if err != nil {
panic(err)
}
// Wait on the main goroutine until the consumer has exited:
err := <-closer.Closed()
log.Fatalf("Consumers closed (error %s)", err)
This project is licensed under the MIT license.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in go-carrot
by you, shall be licensed as MIT, without any additional terms or conditions.