rabbitmq-schema
A schema definition module for RabbitMQ topologies (rabbitmq-schema package) extended with the "x-lvc" ("Last Value Cach") exchange type.
The last value exchange acts like a direct exchange (binding keys are compared for equality with routing keys); but, it also keeps track of the last value that was published with each routing key, and when a queue is bound, it automatically enqueues the last value for the binding key.
The rabbitmq-lvc-exchange plugin to RabbitMQ needs to be enabled to use x-lvc exchanges.
Installation
npm install rabbitmq-schema-lvc
Usage Summary
var RabbitSchema = require('rabbitmq-schema-lvc')
// Validate your rabbit topology by instantiating a RabbitSchema
var schema = new RabbitSchema({
exchange: 'exchange0', // exchange name
type: 'direct', // exchange type
options: {}, // optional, assert or check exchange options
bindings: [{
routingPattern: 'foo.bar.key', // required for direct and topic exchanges
destination: {
queue: 'queue0', // queue name
messageSchema: {} // json-schema to validate messages w/, http://json-schema.org
},
args: {} // optional, binding args
}]
})
// Get all exchanges in the topology,
// returned exchanges are RabbitSchema instances
var exchanges = schema.getExchanges()
var exchange = exchange[0]
exchange.exchange // name
exchange.options // exchange options (for assert or check)
exchange.getDirectBindings() // direct child bindings, explained below
exchange.getQueues() // child queues
exchanges.getExchanges() // self and child exchanges
// Get all queues in the topology,
// returned queues are RabbitSchema instances
var queues = schema.getQueues()
var queue = queues[0]
queue.queue // name
queue.options // queue options
queue.messageSchema // queue message "json-schema", http://json-schema.org
// Get all bindings in the topology,
// returned bindings contain `source` and `destination` RabbitSchema instances
var bindings = schema.getBindings()
var binding = bindings[0]
binding.source // source exchange, RabbitSchema
binding.destination // destination exchange or queue, RabbitSchema
binding.routingPattern // routing pattern for direct or topic queue
// Get direct bindings in the topology,
// returned bindings contain `source` and `destination` RabbitSchema instances
var directBindings = schema.getDirectBindings() // [ /* queue0 */ ]
// Get a queue by name (in entire topology),
// returns queue RabbitSchema
var queue = schema.getQueueByName('queue0')
// Get an exchange by name (in entire topology),
// returns exchange RabbitSchema
var queue = schema.getExchangeByName('exchange0')
// Validate a message by exchange and routingKey
// will find all queues the message makes it to
// and validate the message against all the `messageSchema`s
schema.validateMessage('exchange0', 'foo.bar.key', { foo: 1 })
// Also works with direct queues, by providing queue name
schema.validateMessage('queue0', { foo: 1 })
Usage
Validate RabbitMQ Elements
Validate a Queue
- Requires queue name and messageSchema
var RabbitSchema = require('rabbitmq-schema-lvc')
var queueSchema = new RabbitSchema({
queue: 'queue-name', // queue name, required
messageSchema: { // "json-schema" for validating queue messages
type: 'object'
// ...other properties supported by json-schema draft-04...
// http://json-schema.org
},
options: {} // queue options, optional
})
Validate a Direct Exchange
- Requires exchange name, type, and atleast one binding
- Requires all bindings have
routingPattern
var RabbitSchema = require('rabbitmq-schema-lvc')
var directExchangeSchema = new RabbitSchema({
exchange: 'direct-exchange-name', // exchange name, required
type: 'direct', // required to be direct, topic, fanout or x-vlc
bindings: [{ // atleast 1 binding is required for every exchange
routingPattern: 'foo.bar', // direct exchanges require bindings w/ a routingPattern
destination: queueSchema, // all bindings require a destination (queue or exchange schema)
args: {} // binding args, optional
}],
options: {} // exchange options, optional
})
Validate a Topic Exchange
- Requires exchange name, type, and atleast one binding
- Requires all bindings have
routingPattern
- Also works with arrays of parallel, nested, connected topologies (see
altSchema
in example below)
var RabbitSchema = require('rabbitmq-schema-lvc')
// validate a direct exchange topology
var topicExchangeSchema = new RabbitSchema({
exchange: 'topic-exchange-name', // exchange name, required
type: 'topic', // required to be direct, topic, fanout or x-vlc
bindings: [{ // atleast 1 binding is required for every exchange
routingPattern: 'foo.*', // topic exchanges require bindings w/ a routingPattern
destination: queueSchema // all bindings require a destination (queue or exchange schema)
args: {} // binding args, optional
}],
options: {} // exchange options, optional
})
Validate a Fanout Exchange
- Requires exchange name, type, and atleast one binding
var RabbitSchema = require('rabbitmq-schema-lvc')
var fanoutExchangeSchema = new RabbitSchema({
exchange: 'fanout-exchange-name', // exchange name, required
type: 'fanout', // required to be direct, topic, fanout or x-vlc
bindings: [{ // atleast 1 binding is required for every exchange
destination: queueSchema, // all bindings require a destination (queue or exchange schema)
args: {} // binding args, optional
}],
options: {} // exchange options, optional
})
Validate an LVC Exchange
- Requires exchange name, type, and atleast one binding
- Requires all bindings have
routingPattern
var RabbitSchema = require('rabbitmq-schema-lvc')
var lvcExchangeSchema = new RabbitSchema({
exchange: 'lvc-exch-name', // exchange name, required
type: 'x-lvc', // required to be direct, topic, fanout or x-vlc
bindings: [{ // atleast 1 binding is required for every exchange
routingPattern: 'foo.bar', // direct exchanges require bindings w/ a routingPattern
destination: queueSchema, // all bindings require a destination (queue or exchange schema)
args: {} // binding args, optional
}],
options: {} // exchange options, optional
})
Validate full RabbitMQ Topology
- Validates all elements for required properties and types
- Validates topology to not be circular
- Validates topology for duplicates
Validate a full Topology
var RabbitSchema = require('rabbitmq-schema-lvc')
var foobarQueue = {
queue: 'foobar-name', // queue name, required
messageSchema: { // "json-schema" for validating queue messages
type: 'object',
properties: {
foo: { type: 'string' }
},
required: ['foo']
// ...other properties supported by json-schema draft-04...
// http://json-schema.org
}
}
var fooquxQueue = {
queue: 'fooqux-name', // queue name, required
messageSchema: { // "json-schema" for validating queue messages
type: 'string'
// ...other properties supported by json-schema draft-04...
// http://json-schema.org
},
options: {} // queue options, optional
}
var foobarExchange = {
exchange: 'foobar-direct-exchange', // exchange name, required
type: 'direct', // required to be direct, topic, fanout or x-vlc
bindings: [{ // atleast 1 binding is required for every exchange
routingPattern: 'foo.bar', // direct exchanges require bindings w/ a routingPattern
destination: foobarQueue, // all bindings require a destination (queue or exchange schema)
args: {} // binding args, optional
}],
options: {} // exchange options, optional
}
var fooquxExchange = {
exchange: 'fooqux-direct-exchange', // exchange name, required
type: 'direct', // required to be direct, topic, fanout or x-vlc
bindings: [{ // atleast 1 binding is required for every exchange
routingPattern: 'foo.qux', // direct exchanges require bindings w/ a routingPattern
destination: fooquxQueue, // all bindings require a destination (queue or exchange schema)
args: {} // binding args, optional
}],
options: {} // exchange options, optional
}
var foostarExchange = {
exchange: 'foostar-topic-exchange', // exchange name, required
type: 'topic', // required to be direct, topic, fanout or x-vlc
bindings: [{ // atleast 1 binding is required for every exchange
routingPattern: 'foo.*', // topic exchanges require bindings w/ a routingPattern
destination: foobarExchange,// all bindings require a destination (queue or exchange schema)
args: {} // binding args, optional
}],
options: {} // exchange options, optional
}
var fullSchema = new RabbitSchema(foostarExchange) // full nested, connected topology
// also works with parallel, nested, disconnected topologies
var altSchema = new RabbitSchema([
{
exchange: 'some-exchange',
type: 'direct',
bindings: [{
routingPattern: 'foo.bar',
destination: foobarQueue
}]
},
{
exchange: 'some-exchange3',
type: 'topic',
bindings: [{
routingPattern: 'foo.*',
destination: fooquxQueue
}]
}
])
Validate a message (by traversing topology or direct queue)
Traverse topology to destination queue(s) example
// uses `fullSchema` from full topology snippet above
var exchangeName = 'foostar-topic-exchange'
var routingKey = 'foo.bar'
var message = {}
fullSchema.validateMessage(exchangeName, routingKey, message)
// foostarExchange -> foobarExchange -> foobarQueue (requires an object w/ property `foo`)
// throws SchemaValidationError "'json' should have required property 'foo'"
Throw an error if the message does not reach any destination queues
Traverse exchange to correct queue example1
// uses `fullSchema` from full topology snippet above
var exchangeName = 'foostar-topic-exchange'
var routingKey = 'no.way.out'
var message = {}
fullSchema.validateMessage(exchangeName, routingKey, message)
// foostarExchange -> _nowhere_
// throws Error "message did not reach any queues"
Direct queue example
// uses `fullSchema` from full topology snippet above
var queueName = 'fooqux-queue'
var message = {}
fullSchema.validateMessage(queueName, message)
// fooquxQueue (requires an object w/ property `foo`)
// throws SchemaValidationError "'json' should be a 'string'"
Throws an error if the queue/exchange does not exist
// uses `fullSchema` from full topology snippet above
var queueName = 'nonexistant-queue'
var message = {}
fullSchema.validateMessage(queueName, message)
// throws Error "queue with name 'nonexistant-queue' does not exist"
var exchangeName = 'nonexistant-exchange'
var routingKey = 'foo.bar'
fullSchema.validateMessage(exchangeName, routingKey, message)
// throws Error "exchange with name 'nonexistant-exchange' does not exist"
List Exchanges (w/ bindings) and Queues
rabbitSchema.getExchanges()
returns all exchanges found in schemarabbitSchema.getQueues()
returns all queues in schemarabbitSchema.getBindings()
returns all exchange bindings in schema (including an added source property)- Each item in the lists are schemas themselves
- Easily list all exchanges and queues for assertions or checks
var amqplib = require('amqplib')
var callbackCount = require('callback-count')
amqplib.connect('amqp://192.168.99.100:32771', function (err, conn) {
conn.createChannel(function (channel) {
// connected to rabbitmq, assert all the things
assertExchanges(channel, fullSchema, function (err) {
if (err) { throw err }
assertQueues(channel, fullSchema, function (err) {
if (err) { throw err }
assertBindings(channel, fullSchema, function (err) {
if (err) { throw err }
})
})
})
})
})
// assert all exchanges in the schema
function assertExchanges (channel, fullSchema, cb) {
var count = callbackCount(exchanges.length, cb)
var exchanges = fullSchema.getExchanges()
exchanges.forEach(function (exchange) {
var name = exchange.exchange
var type = exchange.type
var opts = exchange.options
channel.assertExchange(name, type, opts, count.next)
})
}
// assert all queues in the schema
function assertQueues (channel, fullSchema, cb) {
var count = callbackCount(queues.length, cb)
var queues = fullSchema.getQueues()
queues.forEach(function (queue) {
var name = queue.queue
var opts = exchange.options
channel.assertQueue(name, opts, count.next)
})
}
// assert all bindings in the schema
function assertBindings (channel, fullSchema, cb) {
var count = callbackCount(exchanges.length, cb)
var bindings = fullSchema.getBindings()
bindings.forEach(function (binding) {
// source is always an exchange
var srcName = bindings.source.exchange
var destName
if (bindings.destination.exchange) {
// destination is an exchange
destName = binding.destination.exchange
channel.bindExchange(destName, srcName, binding.routingPattern, binding.args)
} else {
// destination is a queue
destName = binding.destination.queue
channel.bindQueue(destName, srcName, binding.routingPattern, binding.args)
}
})
}
Get a Queue or Exchange by Name
// uses `fullSchema` from full topology snippet above
var foobarQueue = fullSchema.getQueueByName('foobar-name')
var foobarExchange = fullSchema.getExchangeByName('foobar-name')
License
MIT