/wascally

Abstractions to simplify working with wascally wabbitMQ

Primary LanguageJavaScriptMIT LicenseMIT

Wascally

Version npm npm Downloads Dependencies

This is a very opinionated abstraction over amqplib to help simplify certain common tasks and (hopefully) reduce the effort required to use RabbitMQ in your Node services.

Features:

  • Gracefully handle re-connections
  • Automatically re-define all topology on re-connection
  • Automatically re-send any unconfirmed messages on re-connection
  • Support the majority of RabbitMQ's extensions
  • Handle batching of acknowledgements and rejections
  • Topology & configuration via the JSON configuration method (thanks to @JohnDMathis!)

Assumptions & Defaults:

  • Fault-tolerance/resilience over throughput
  • Default to publish confirmation
  • Default to ack mode on consumers
  • Heterogenous services that include statically typed languages
  • JSON as the only serialization provider

Demos

API Reference

This library implements promises for many of the calls via when.js.

Connectivity Events

Wascally emits both generic and specific connectivity events that you can bind to in order to handle various states:

  • Any Connection
  • connected
  • closed
  • failed
  • Specific Connection
  • [connectionName].connected.opened
  • [connectionName].connected.closed
  • [connectionName].connected.failed

The connection object is passed to the event handler for each event. Use the name property of the connection object to determine which connection the generic events fired for.

!IMPORTANT! - wascally handles connectivity for you, mucking about with the connection directly isn't supported (don't do it).

Sending & Receiving Messages

Publish

The publish call returns a promise that is only resolved once the broker has accepted responsibility for the message (see Publisher Acknowledgments for more details). If a configured timeout is reached, or in the rare event that the broker rejects the message, the promise will be rejected. More commonly, the connection to the broker could be lost before the message is confirmed and you end up with a message in "limbo". Wascally keeps a list of unconfirmed messages that have been published in memory only. Once a connection is re-established and the topology is in place, Wascally will prioritize re-sending these messages before sending anything else.

In the event of a disconnect, all publish promises that have not been resolved are rejected. This behavior is a problematic over-simplification and subject to change in a future release.

Publish timeouts can be set per message, per exchange or per connection. The most specific value overrides any set at a higher level. There are no default timeouts set at any level. The timer is started as soon as publish is called and only cancelled once wascally is able to make the publish call on the actual exchange's channel. The timeout is cancelled once publish is called and will not result in a rejected promise due to time spent waiting on a confirmation.

publish( exchangeName, options, [connectionName] )

This syntax uses an options object rather than arguments, here's an example showing all of the available properties:

rabbit.publish( 'exchange.name', {
		routingKey: 'hi',
		type: 'company.project.messages.textMessage',
		correlationId: 'one',
		body: { text: 'hello!' },
		messageId: '100',
		expiresAfter: 1000 // TTL in ms, in this example 1 second
		timestamp: // posix timestamp (long)
		headers: {
			'random': 'application specific value'
		},
		timeout: // ms to wait before cancelling the publish and rejecting the promise
	},
	connectionName: '' // another optional way to provide connection name if needed
);

publish( exchangeName, typeName, messageBody, [routingKey], [correlationId], [connectionName] )

Messages bodies are simple objects. A type specifier is required for the message which will be used to set AMQP's properties.type. If no routing key is provided, the type specifier will be used. A routing key of '' will prevent the type specifier from being used.

// the first 3 arguments are required
// routing key is optional and defaults to the value of typeName
// connectionName is only needed if you have multiple connections to different servers or vhosts

rabbit.publish( 'log.entries', 'company.project.messages.logEntry', {
		date: Date.now(),
		level: logLevel,
		message: message
	}, 'log.' + logLevel, someValueToCorrelateBy );

request( exchangeName, options, [connectionName] )

This works just like a publish except that the promise returned provides the response (or responses) from the other side.

// when multiple responses are provided, all but the last will be provided via the .progress callback.
// the last/only reply will always be provided to the .then callback
rabbit.request( 'request.exchange', {
		// see publish example to see options for the outgoing message
	} )
	.progress( function( reply ) {
		// if multiple replies are provided, all but the last will be sent via the progress callback
	} )
	.then( function( final ) {
		// the last message in a series OR the only reply will be sent to this callback
	} );

handle( typeName, handler, [context] )

Notes:

  • Handle calls should happen before starting subscriptions.
  • The message's routing key will be used if the type is missing or empty on incoming messages

Message handlers are registered to handle a message based on the typeName. Calling handle will return a reference to the handler that can later be removed. The message that is passed to the handler is the raw Rabbit payload. The body property contains the message body published. The message has ack, nack (requeue the message) and reject (don't requeue the message) methods control what Rabbit does with the message.

Explicit Error Handling

In this example, any possible error is caught in an explicit try/catch:

var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
	try {
		// do something meaningful?
		console.log( message.body );
		message.ack();
	} catch( err ) {
		message.nack();
	}
} );

handler.remove();

Automatically Nack On Error

This example shows how to have wascally wrap all handlers with a try catch that:

  • nacks the message on error
  • console.log that an error has occurred in a handle
// after this call, any new callbacks attached via handle will be wrapped in a try/catch
// that nacks the message on an error
rabbit.nackOnError();

var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
	console.log( message.body );
	message.ack();
} );

handler.remove();

// after this call, new callbacks attached via handle will *not* be wrapped in a try/catch
rabbit.ignoreHandlerErrors();

Late-bound Error Handling

Provide a strategy for handling errors to multiple handles or attach an error handler after the fact.

var handler = rabbit.handle( 'company.project.messages.logEntry', function( message ) {
	console.log( message.body );
	message.ack();
} );

handler.catch( function( err, msg ) {
	// do something with the error & message
	msg.nack();
} );

!!! IMPORTANT !!!

Failure to handle errors will result in silent failures and lost messages.

Unhandled Messages

In previous versions, if a subscription was started in ack mode (the default) without a handler to process the message, the message would get lost in limbo until the connection (or channel) was closed and then the messages would be returned to the queue. This is very confusing and undesirable behavior. To help protect against this, the new default behavior is that any message received that doesn't have any elligible handlers will get nack'd and sent back to the queue immediately.

This is still problematic because it can create churn on the client and server as the message will be redelivered indefinitely.

To change this behavior, use one of the following calls:

Note: only one of these strategies can be activated at a time

onUnhandled( handler )

rabbit.onUnhandled( function( message ) {
	 // handle the message here
} );

nackUnhandled() - default

Sends all unhandled messages back to the queue.

rabbit.nackUnhandled();

rejectUnhandled()

Rejects unhandled messages so that will will not be requeued. DO NOT use this unless there are dead letter exchanges for all queues.

rabbit.rejectUnhandled();

startSubscription( queueName, [connectionName] )

Recommendation: set handlers for anticipated types up before starting subscriptions.

Starts a consumer on the queue specified. connectionName is optional and only required if subscribing to a queue on a connection other than the default one.

Message Format

The following structure shows and briefly explains the format of the message that is passed to the handle callback:

{
	// metadata specific to routing & delivery
	fields: {
		consumerTag: "", // identifies the consumer to rabbit
		deliveryTag: #, // identifies the message delivered for rabbit
		redelivered: true|false, // indicates if the message was previously nacked or returned to the queue
		exchange: "" // name of exchange the message was published to,
		routingKey: "" // the routing key (if any) used when published
	},
	properties:{
		contentType: "application/json", // wascally's default
		contentEncoding: "utf8", // wascally's default
		headers: {}, // any user provided headers
		correlationId: "", // the correlation id if provided
		replyTo: "", // the reply queue would go here
		messageId: "", // message id if provided
		type: "", // the type of the message published
		appId: "" // not used by wascally
	},
	content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body
	body: , // this could be an object, string, etc - whatever was published
	type: "" // this also contains the type of the message published
}

Message API

Wascally defaults to (and assumes) queues are in ack mode. It batches ack and nack operations in order to improve total throughput. Ack/Nack calls do not take effect immediately.

message.ack()

Enqueues the message for acknowledgement.

message.nack()

Enqueues the message for rejection. This will re-enqueue the message.

message.reject()

Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.

message.reply( message, [more], [replyType] )

Acknowledges the messages and sends the message back to the requestor. The message is only the body of the reply. Providing true to more will cause the message to get sent to the .progress callback of the request promise so that you can send multiple replies. The replyType argument sets the type of the reply message. (important when messaging with statically typed languages)

Queues in noBatch mode

Wascally now supports the ability to put queues into non-batching behavior. This causes ack, nack and reject calls to take place against the channel immediately. This feature is ideal when processing messages are long-running and consumer limits are in place. Be aware that this feature does have a significant impact on message throughput.

Reply Queues

By default, wascally creates a unique reply queue for each connection which is automatically subscribed to and deleted on connection close. This can be modified or turned off altogether.

Changing the behavior is done by passing one of three values to the replyQueue property on the connection hash:

!!! IMPORTANT !!! wascally cannot prevent queue naming collisions across services instances or connections when using the first two options.

Custom Name

Only changes the name of the reply queue that wascally creates - autoDelete and subscribe will be set to true.

rabbit.addConnection( {
	// ...
	replyQueue: 'myOwnQueue'
} );

Custom Behavior

To take full control of the queue name and behavior, provide a queue definition in place of the name.

wascally provides no defaults - it will only use the definition provided

rabbit.addConnection( {
	// ...
	replyQueue: {
		name: 'myOwnQueue',
		subscribe: 'true',
		durable: true
	}
} );

No Automatic Reply Queue

Only pick this option if request/response isn't in use or when providing a custom overall strategy

rabbit.addConnection( {
	// ...
	replyQueue: false
} );

Managing Connections

addConnection ( options )

The call returns a promise that can be used to determine when the connection to the server has been established.

Options is a hash that can contain the following:

  • name String the name of this connection. Defaults to "default" when not supplied.
  • server String the IP address or DNS name of the RabbitMQ server. Defaults to "localhost"
  • port String the TCP/IP port on which RabbitMQ is listening. Defaults to 5672
  • vhost String the named vhost to use in RabbitMQ. Defaults to the root vhost, '%2f' ("/")
  • protocol String the connection protocol to use. Defaults to 'amqp://'
  • user String the username used for authentication / authorization with this connection. Defaults to 'guest'
  • pass String the password for the specified user. Defaults to 'guest'
  • timeout number how long to wait for a connection to be established. No default value
  • heartbeat number how often the client and server check to see if they can still reach each other, specified in seconds. Defaults to 30 (seconds)
  • replyQueue String the name of the reply queue to use (see above)
  • publishTimeout number the default timeout in milliseconds for a publish call

Note that the "default" connection (by name) is used when any method is called without a connection name supplied.

rabbit.addConnection( {
	user: 'someUser',
	pass: 'sup3rs3cr3t',
	server: 'my-rqm.server',
	port: 5672,
	timeout: 2000,
	vhost: '%2f',
	heartbeat: 10
} );

Managing Topology

addExchange( exchangeName, exchangeType, [options], [connectionName] )

The call returns a promise that can be used to determine when the exchange has been created on the server.

Valid exchangeTypes:

  • 'direct'
  • 'fanout'
  • 'topic'

Options is a hash that can contain the following:

  • autoDelete true|false delete when consumer count goes to 0
  • durable true|false survive broker restarts
  • persistent true|false a.k.a. persistent delivery, messages saved to disk
  • alternate 'alt.exchange' define an alternate exchange
  • publishTimeout 2^32 timeout in milliseconds for publish calls to this exchange

addQueue( queueName, [options], [connectionName] )

The call returns a promise that can be used to determine when the queue has been created on the server.

Options is a hash that can contain the following:

  • autoDelete true|false delete when consumer count goes to 0
  • durable true|false survive broker restarts
  • exclusive true|false limits queue to the current connection only (danger)
  • subscribe true|false auto-start the subscription
  • limit 2^16 max number of unacked messages allowed for consumer
  • noAck true|false the server will remove messages from the queue as soon as they are delivered
  • noBatch true|false causes ack, nack & reject to take place immediately
  • queueLimit 2^32 max number of ready messages a queue can hold
  • messageTtl 2^32 time in ms before a message expires on the queue
  • expires 2^32 time in ms before a queue with 0 consumers expires
  • deadLetter 'dlx.exchange' the exchange to dead-letter messages to
  • maxPriority 2^8 the highest priority this queue supports

bindExchange( sourceExchange, targetExchange, [routingKeys], [connectionName] )

Binds the target exchange to the source exchange. Messages flow from source to target.

bindQueue( sourceExchange, targetQueue, [routingKeys], [connectionName] )

Binds the target queue to the source exchange. Messages flow from source to target.

Configuration via JSON

Note: setting subscribe to true will result in subscriptions starting immediately upon queue creation.

This example shows most of the available options described above.

	var settings = {
		connection: {
			user: 'guest',
			pass: 'guest',
			server: '127.0.0.1',
			// server: '127.0.0.1, 194.66.82.11',
			// server: ['127.0.0.1', '194.66.82.11'],
			port: 5672,
			timeout: 2000,
			vhost: '%2fmyhost'
			},
		exchanges:[
			{ name: 'config-ex.1', type: 'fanout', publishTimeout: 1000 },
			{ name: 'config-ex.2', type: 'topic', alternate: 'alternate-ex.2', persistent: true },
			{ name: 'dead-letter-ex.2', type: 'fanout' }
			],
		queues:[
			{ name:'config-q.1', limit: 100, queueLimit: 1000 },
			{ name:'config-q.2', subscribe: true, deadLetter: 'dead-letter-ex.2' }
			],
		bindings:[
			{ exchange: 'config-ex.1', target: 'config-q.1', keys: [ 'bob','fred' ] },
			{ exchange: 'config-ex.2', target: 'config-q.2', keys: 'test1' }
		]
	};

To establish a connection with all settings in place and ready to go call configure:

	var rabbit = require( 'wascally' );

	rabbit.configure( settings ).done( function() {
		// ready to go!
	} );

Closing Connections

Wascally will attempt to resolve all outstanding publishes and recieved messages (ack/nack/reject) before closing the channels and connection. If you would like to defer certain actions until after everything has been safely resolved, then use the promise returned from either close call.

!!! CAUTION !!! - using reset is dangerous. All topology associated with the connection will be removed meaning wasclly will not be able to re-establish it all should you decide to reconnect.

close( [connectionName], [reset] )

Closes the connection, optionall resetting all previously defined topology for the connection. The connectionName uses default if one is not provided.

closeAll( [reset] )

Closes all connections, optionally resetting the topology for all of them.

AMQPS, SSL/TLS Support

Providing the following configuration options setting the related environment varibles will cause wascally to attempt connecting via AMQPS. For more details about which settings perform what role, refer to the amqplib's page on SSL.

	connection: { 		// sample connection hash
		caPath: '', 	// comma delimited paths to CA files. RABBIT_CA
		certPath: '', 	// path to cert file. RABBIT_CERT
		keyPath: '',	// path to key file. RABBIT_KEY
		passphrase: '', // passphrase associated with cert/pfx. RABBIT_PASSPHRASE
		pfxPath: ''		// path to pfx file. RABBIT_PFX
	}

Channel Prefetch Limits

Wascally mostly hides the notion of a channel behind the scenes, but still allows you to specify channel options such as the channel prefetch limit. Rather than specifying this on a channel object, however, it is specified as a limit on a queue defintion.

queues: [{
  // ...

  limit: 5
}]

// or

rabbit.addQueue("some.q", {
  // ...

  limit: 5
});

This queue configuration will set a prefetch limit of 5 on the channel that is used for consuming this queue.

Note: The queue limit is not the same as the queueLimit option - the latter of which sets the maximum number of messages allowed in the queue.

Additional Learning Resources

Watch Me Code

Thanks to Derick Bailey's input, the API and documentation for wascally have improved a lot. You can learn from Derick's hands-on experience in his Watch Me Code series.

RabbitMQ In Action

Alvaro Vidella and Jason Williams literally wrote the book on RabbitMQ.

Enterprise Integration Patterns

Gregor Hophe and Bobby Woolf's definitive work on messaging. The site provides basic descriptions of the patterns and the book goes into a lot of detail.

I can't recommend this book highly enough; understanding the patterns will provide you with the conceptual tools need to be successful.

Contributing

PRs with insufficient coverage, broken tests or deviation from the style will not be accepted.

Behavior & Integration Tests

PRs should include modified or additional test coverage in both integration and behavioral specs. Integration tests assume RabbitMQ is running on localhost with guest/guest credentials and the consistent hash exchange plugin enabled. You can enable the plugin with the following command:

rabbitmq-plugins enable rabbitmq_consistent_hash_exchange

Running gulp will run both sets after every file change and display a coverage summary. To view a detailed report, run gulp coverage once to bring up the browser.

Vagrant

Wascally now provides a sample Vagrantfile that will set up a virtual machine that runs RabbitMQ. Under the hood, it uses the official RabbitMQ Docker image. It will forward RabbitMQ's default ports to localhost.

First, you will need to copy the sample file to a usable file:

$ cp Vagrantfile.sample Vagrantfile

Adjust any necessary settings. Then, from the root of the project, run:

$ vagrant up

This will create your box. Right now, it supports the virtualbox and vmware_fusion providers. To access the box, run:

$ vagrant ssh

Once inside, you can view the RabbitMQ logs by executing:

$ docker logs rabbitmq

When the Vagrant box is running, RabbitMQ can be accessed at localhost:5672 and the management console at http://localhost:15672.

Click here for more information on Vagrant, Docker, and official RabbitMQ Docker image.

To run tests using Vagrant:

Execute from the host machine:

$ vagrant up
$ gulp

Style

This project has both an .editorconfig and .esformatter file to help keep adherance to style simple. Please also take advantage of the .jshintrc file and avoid linter warnings.

Roadmap

  • additional test coverage
  • support RabbitMQ backpressure mechanisms
  • (configurable) limits & behavior when publishing during connectivity issues
  • ability to capture/log unpublished messages on shutdown
  • add support for Rabbit's HTTP API
  • enable better cluster utilization by spreading connections out over all nodes in cluster