AMQP = require('amqp-coffee') # path to this
testData = "the data to be published.. I am a string but could be anything"
amqpConnection = new AMQP {host:'localhost'}, (e, r)->
if e?
console.error "Error", e
amqpConnection.queue {queue: "queueName"}, (e,q)->
q.declare ()->
q.bind "amq.direct", "queueName", ()->
amqpConnection.publish "amq.direct", "queueName", testData, {confirm: true}, (err, res)->
console.log "Message published"
consumer = amqpConnection.consume "queueName", {prefetchCount: 2}, (message)->
console.log message.data.toString()
message.ack()
, (e,r)->
console.log "Consumer setup"
amqpConnection.publish "amqp.direct", "queueName", "message contents", {deliveryMode:2, confirm:true}, (e, r)->
if !e? then console.log "Message Sent"
- Class: amqp-coffee
- new amqp-coffee([connectionOptions],[callback])
- connection.queue([queueOptions],[callback])
- connection.exchange([exchangeArgs],[callback])
- connection.publish(exchange, routingKey, data, [publishOptions], [callback])
- connection.consume(queueName, options, messageListener, [callback])
- connection.close()
Creates a new amqp Connection. The connection is returned directly and in the callback. The connection extends EventEmitter.
The callback is called if there is a sucessful connection OR a unsucessful connection and connectionOptions.reconnect is false. If connectionOptions.reconnect is false, you will get a error back in the callback. If no callback is specified it will be emitted.
The connectionOptions
argument should be an object which specifies:
host
: a string of the hostname OR an array of hostname strings OR an array of hostname objects {host, port}port
: a integer of the port to connect to. Not used if host is an object.login
: "guest"password
: "guest"vhost
: '/'port
: 5672heartbeat
: 10000 # in msreconnect
: truereconnectDelayTime
: 1000 # in mshostRandom
: false- `connectTimeout: 30000 # in ms, this is only used if reconnect is false
clientProperties
: {version: clientVersion, platform, product}ssl
: falsesslOptions
: {} # tls options like cert, key, ca, secureProtocol, passphrasetemporaryChannelTimeout
: 2000 # in ms, temporary channels are used to setup queues, bindings, and exchanges. If you are frequently tearing down and setting up new queues it could make sense to make this longer.noDelay
: true # disable Nagle's algorithm by default
Host Examples
host: 'localhost'
host: {host: 'localhost', port: 15672}
host: ['localhost','yourhost']
host: [{host: 'localhost', port:15672}, {host: 'localhost', port:15673}]
Sample Connection
amqp-coffee = require('amqp-coffee')
amqp = new amqp-coffee {host: 'localhost'}, (error, amqpConnection)->
assert(amqp == amqpConnection)
On a connection close, we start the reconnect process if reconnect
is true.
After the reconnectDelayTime
the hosts are rotated if more than one host
is specified.
A new connection is atempted, if the connection is not sucessful this process repeats.
After a connection is re-establed, all of the channels are reset, this atempts to reopen that channel. Different channel types re-establish there channels differently.
- Publisher channels, will only reconnect when a publish is atempted.
- Consumer channels will reconnect and resume consuming. If it was a autoDelete queue, this could fail. Make sure you listen to the ready even on the connection to re-set up and consume any autoDelete queues.
- Queue / Exchange channels are recreated on demand.
Emitted when the connection is open successfully. This will be called after each successful reconnect.
Emitted when a open connection leaves the ready state and is closed.
Very rare, only emitted when there's a server version mismatch
This returns a channel that can be used to declare, bind, unbind, or delete queus. This on its own does NOT declare a queue. When creating a queue class using connection.queue, you can specify options that will be used in all the child methods.
The queueOptions
argument should be an object which specifies:
queue
: a string repensenting the queue name, can also be empty to use a autogenerated queue nameautoDelete
: default: truenoWait
: default: falseexclusive
: default: false. The queue can only be used by the current connection.durable
: default: falsepassive
: default: false. The queue creation will not fail if the queue already exists.arguments
: default: {}. Pass queue configuartion arguments, e.g.'x-dead-letter-exchange'
.
Both queues and exchanges use "temporary" channels, which are channels amqp-coffee manages specifically for declaring, binding, unbinding, and deleting queues and exchanges. After 2 seconds of inactivity these channels are closed, and reopened on demand.
Will take a new set of queueOptions, or use the default. Issues a queueDeclare and waits on queueDeclareOk if a callback is specified.
amqp = new AMQP, ()->
amqp.queue({queue:'queueToCreate'}, (err, Queue)->
Queue.declare (err, res)->
# the queue is now declared
To use a auto-generated queue name
amqp = new AMQP, ()->
amqp.queue({queue:''}, (err, Queue)->
Queue.declare (err, res)->
queueName = res.queue
The queueDeleteOptions
argument should be an object which specifies:
queue
: name of the queueifUnused
: default: falseifEmpty
: default: truenoWait
: default: false
Sets up bindings from an already existing exchange to an already existing queue
Tears down an already existing binding
Rabbitmq specific, re-declares the queue and returns the messageCount from the response
Rabbitmq specific, re-declares the queue and returns the consumerCount from the response
This returns a channel that can be used to declare, bind, unbind, or delete exchanges. This on its own does NOT declare a exchange. When creating an exchange class using connection.exchange, you can specify options that will be used in all the child methods.
The exchangeArgs
argument should be an object which specifies:
exchange
: a string representing the exchange nametype
: "direct"passive
: falsedurable
: falsenoWait
: falseautoDelete
: trueinternal
: false
Both queues and exchanges use "temporary" channels, which are channels amqp-coffee manages specifically for declaring, binding, unbinding, and deleting queues and exchanges. After 2 seconds of inactivity these channels are closed, and reopened on demand.
The exchangeDeleteOptions
argument should be an object which specifies:
exchange
: the name of the exchangeifUnused
: falsenoWait
: false
Rabbitmq Extension, to bind between exchanges, sourceExchange
if omitted will be defaulted to the exchange it's being called on.
Rabbitmq Extension, to bind between exchanges, sourceExchange
if omitted will be defaulted to the exchange it's being called on.
amqp-coffee manages publisher channels and sets them up on the first publish. Confirming is a state a channel must be put in, so a channel is needed for confimed publishes and one for non confimed publishes. They are only created on demand. So you should have a maximum of 2 channels publishing for a single connection.
New in 0.1.20 if you set the mandatory or immediate flag with the confirm flag we add a tracking header on that message headers.x-seq which is a numeric representation of that message just like the sequence number. That flag is used to re-connect a messages that has failed publishing and come back as a "basicReturn" to a already existing callback. This allows you to publish to a queue that may not exist and get a bounce if it doesnt. Or if a queue is in a bad state the message will fail routing and come back.
exchange
: string of the exchange to publish toroutingKey
: string to use to route the messagedata
: any type of data, if it is an object it will be converted into json automatically and unconverted on consume. Strings are converted into buffers.publishOptions
: All parameters are passed through as arguments to the publisher.confirm
: falsemandatory
: falseimmediate
: falsecontentType
: 'application/octet-stream'
consumers use their own channels and are re-subscribed to on reconnect. Returns a consumer object.
queueName
: string of the queue to subscribe tooptions
:noLocal
: falsenoAck
: trueexclusive
: falsenoWait
: falseprefetchCount
: integer. If specified the consumer will enter qos mode and you will have to ack messages. If specifiednoAck
will be set to falseconsumerTag
: optional string. If not specified one will be generated for you.
messageListener
: a function (message)callback
: a function that is called once the consume is setup
messageListener is a function that gets a message object which has the following attributes:
data
: a getter that returns the data in its parsed form, eg a parsed json object, a string, or the raw bufferraw
: the raw buffer that was returnedproperties
: headers specified for the messagesize
: message body sizeack()
: function : only used when prefetchCount is specifiedreject()
: function: only used when prefetchCount is specifiedretry()
: function: only used when prefetchCount is specified
listener = (message)->
# we will only get 1 message at a time because prefetchCount is set to 1
console.log "Message Data", message.data
message.ack()
amqp = new AMQP ()->
amqp.queue {queue: 'testing'}, (e, queue)->
queue.declare ()->
queue.bind 'amq.direct', 'testing', ()->
amqp.publish 'amq.direct', 'testing', 'here is one message 1'
amqp.publish 'amq.direct', 'testing', 'here is one message 2'
amqp.consume 'testing', {prefetchCount: 1}, listener, ()->
console.log "Consumer Ready"
Errors will be emitted from the consumer if we can not consumer from that queue anymore. For example if you're consuming a autoDelete queue and you reconnect that queue will be gone. It will return the raw error message with code as the message.
The cancel event will be emitted from the consumer if we receive a server initiated "basic.cancel". For this to happen you must
let the server know you are expecting a cancel, you do this by specifying clientProperties on connect. clientProperties: { capabilities: { consumer_cancel_notify: true }}
https://www.rabbitmq.com/consumer-cancel.html
Will update the prefetch count of an already existing consumer; can be used to dynamically tune a consumer.
Sends basicCancel and waits on basicCancelOk
consumer.cancel
Calls consumer.cancel, if we're currently consuming. Then calls channel.close and calls the callback as soon as the channel close is sent, NOT when channelCloseOk is returned.
consumer.consume, sets up the consumer with a new consumer tag
An alias for consumer.pause (active == false) and consome.resume (active == true)
More documentation to come. The tests are a good place to reference.
First of all this was heavily inspired by https://github.com/postwait/node-amqp
Changes from node-amqp
- the ability to share channels intelligently. ( if you are declaring multiple queues and exchanges there is no need to use multiple channels )
- auto channel closing for transient channels ( channels used for declaring and binding if they are inactive )
- consumer reconnecting
- fixed out-of-order channel operations by ensuring things are writing in order and not overwriting buffers that may not have been pushed to the network.
- switch away from event emitters for consumer acks
- everything that can be async is async
- native bson support for messages with contentType application/bson
- ability to delete, bind, and unbind a queue without having to know everything about the queue like auto delete etc...
- can get the message and consumer count of a queue
- can turn flow control on and off for a consumer (pause, resume) receiving messages.
- rabbitmq master queue connection preference. When you connect to an array of hosts that have queues that are highly available (HA) it can talk to the rabbit api and make sure it talks to the master node for that queue. You can get way better performance with consumer acks.