A common interface to multiple message queues libraries. Think of it as the ORM to message queues.
Add this line to your application's Gemfile:
gem 'message_queue'
And then execute:
$ bundle
Or install it yourself as:
$ gem install message_queue
- In memory through Observable
- RabbitMQ through bunny
- Plain text
- JSON through multi_json
- Message pack through msgpack
with_connection
initializes a connection using the specified adapter
and serializer, connects to the message queue, runs a block of code and
disconnects.
MessageQueue.with_connection(:adapter => :bunny, :serializer => :message_pack) do |conn|
producer = conn.new_producer(
:exchange => {
:name => "time",
:type => :topic
},
:message => {
:routing_key => "time.now"
}
)
consumer = conn.new_consumer(
:queue => {
:name => "print_time_now"
},
:exchange => {
:name => "time",
:routing_key => "time.#"
}
)
consumer.subscribe do |delivery_info, metadata, payload|
puts "Received message: #{payload}"
end
producer.publish Time.now.to_s
sleep 1
end
You could maintain a global connection by using the connect
method on
MessageQueue
.
MessageQueue.connect(:adater => :bunny, :serializer => :json)
puts MessageQueue.connected? # => true
producer = MessageQueue.new_producer(
:exchange => {
:name => "time",
:type => :topic
},
:message => {
:routing_key => "time.now"
}
)
consumer = MessageQueue.new_consumer(
:queue => {
:name => "print_time_now"
},
:exchange => {
:name => "time",
:routing_key => "time.#"
}
)
consumer.subscribe do |delivery_info, metadata, payload|
puts "Received message: #{payload}"
end
producer.publish Time.now.to_s
sleep 1
MessageQueue.disconnect
puts MessageQueue.connected? # => false
You could also mix in the MessageQueue::Producible
module and the
MessageQueue::Consumable
module to your producer and consumer
respectively. The consumer needs to implement a process
method which will be passed a MessageQueue::Message
object when it
receives a message.
class Producer
include MessageQueue::Producible
exchange :name => "time", :type => :topic
message :routing_key => "time.now", :mandatory => true
end
class Consumer
include MessageQueue::Consumable
queue :name => "print_time_now"
exchange :name => "time", :routing_key => "time.#"
def process(message)
puts "Received message #{message.payload}"
end
end
MessageQueue.connect(:adater => :bunny, :serializer => :json)
Producer.new.publish(Time.now.to_s)
sleep 1
MessageQueue.disconnect
See examples.
For Rails, message_queue
automatically loads settings from
RAILS_ROOT/config/message_queue.yml
. If the file doesn't exist, it
initializes the queue with in-memory mode.
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request