This service leverages RabbitMQ and generators to create a simple, extensible service for messaging. Functionality is built on top of the rabbit.js module.
let rs = require('../index');
let service = rs();
service.use(function (next) {
return new Promise((resolve, reject) => {
console.log('--> 1');
next().then(() => {
console.log('<-- 1');
resolve()
});
});
});
service.register({
channel : 'test',
topic : 'testing.stuff',
controller : function () {
return new Promise((resolve, reject) => {
console.log('RECEIVED: ', this.message);
resolve();
})
}
});
service.listen()
.then(() => console.log('\n\nListening...'))
.catch((e) => console.log(e.stack));
let co = require('co');
let rs = require('../index');
let service = rs();
co(function *() {
service.use(function *(next) {
console.log('--> 1');
yield next()
console.log('<-- 1');
});
service.register({
channel : 'test',
topic : 'testing.stuff',
controller : function *() {
console.log('RECEIVED: ', this.message);
}
});
yield service.listen();
console.log('\n\nListening...');
}).catch((e) => console.log(e));
let rs = require('rabbit-service');
let service = rs('amqp://myhost.foo.com');
- Url string to be deconstructed into host connection parameters
- Configuration object to overwrite the following defaults
{
"controllers": null,
"reconnect": {
"start_delay": 1000,
"max_delay": 60000,
"multiplier": 2
},
"context": {
"protocol": "amqp",
"user": "guest",
"pass": "guest",
"host": "127.0.0.1",
"port": false,
},
"socket": {
"listen": "data",
"type": "SUB",
"encoding": "utf8",
"options": {
"routing": "topic"
}
}
}
- Instance of a service
In most cases, this
will refer to the context of the request, and is shared across all functions called in the request
{
context: [Context], // rabbit.js client context
channel: [String], // channel the message ws sent on
socket: { // socket config after being processed
channel: [String],
topic: [String],
controller: [Function], // controller defined by the user
_controller: [Function], // controller post processed
listen: [String],
type: [String],
encoding: [String],
options: {} // options accepted by the rabbit.js socket connection
},
controller: [String],
message: [String] // message sent to the socket
}
let service = require('rabbit-service')();
// generator
service.use(function *(next) {
yield next()
});
// promise
service.use(function (next) {
return new Promise((resolve, reject) => {
next().then(resolve);
});
})
- Either
- Generator function
- Function which returns a promise
- Function parameter is added to the middleware chain
- Message event is processed in order that the middleware was added
- Function must call
next()
to progress to the next middleware function - The context
this
refers to the message event and is shared through the middleware chain
let service = require('rabbit-service')();
service.register({
channel: 'test',
topic: 'testing.stuff',
controller: function () {
return new Promise((resolve, reject) => {
console.log('RECEIVED: ', this.message);
resolve();
})
}
});
service.register([{
channel : 'test',
topic : 'testing.stuff',
controller : function () {
return new Promise((resolve, reject) => {
console.log('RECEIVED: ', this.message);
resolve();
})
}
}, {
channel: 'test',
topic: 'testing.stuff',
controller: 'test_controller.foo'
}]);
- Either
- Socket config
- Array of configs
- Configs will be defaulted against the config default socket
- Configs must have a controller property that is either:
- Generator function
- Function returning a promise
- String specifying a controller
- Requires: Service config property
controller
to specify a path to a directory - If a property chain is used:
- The leading word (
.
delimited) will be the controller - The remaining string is used to traverse subproperties of the controller module
- Eg:
my_controller.foo[0]
my_controller
is the file in the controllers directory- The handler used will be the function found in the first element
[0]
of thefoo
property
- The leading word (
- Requires: Service config property
- The config passed will be used to crete a socket via rabbit.js
- The
controller
property will be called after all middleware is called - The context of
this
refers to the message event and is shared through the middleware chain
let service = require('rabbit-service')();
service.catch(function *(e) {
console.log(e);
});
service.catch(function (e) {
return new Promise((resolve, reject) => {
console.log(e);
resolve()
});
});
- Either
- Generator function
- Function returning a promise
- Function should expect a parameter
error
- Note:
this
is not guaranteed to be the context of a message event. Errors may occur outside the context of a message
- The callback is called any time an error is thrown from the rabbit client
- For middleware/controller only error handling, use:
service.use(function *(next) {
try {
yield next()
} catch (e) {
/* Handle here */
}
});
// OR
service.use(function (next) {
return new Promise((resolve, reject) => {
yield next().catch((e) => {
/* Handle here */
});
});
});
let service = require('rabbit-service')();
service.use(function *() {
if (!data.attempts && !data.alive) console.log('Connection dropped');
if (data.alive) {
console.log('Connection recovered');
} else if (data.attempts) {
console.log('Reconnect attempt (' + data.attempts + ') failed after (' + data.delay + ')');
}
});
// OR
service.reconnect({
start_delay: 5000,
handler: function *() {
if (!data.attempts && !data.alive) console.log('Connection dropped');
if (data.alive) {
console.log('Connection recovered');
} else if (data.attempts) {
console.log('Reconnect attempt (' + data.attempts + ') failed after (' + data.delay + ')');
}
}
})
- Either
- Generator or funciton which returns a promise to be treated as the handler
- Config to overwrite the default config, wich an optional
handler
function - Default:
{
start_delay: 1000, // start delay
max_delay: 60000, // maximum delay
multiplier: 2 // amount the delay is multiplied with each failure
}
- If the connection is dropped, a reconnection loop with exponential backoff begins
- the handler function is called after every drop, and reconnection
let service = require('rabbit-service')();
service.listen().then(() => console.log('listening'));
- Optional config for last minute overrides
- Config matches object passed to
rs(config)
- Compiles all functions and handlers into a finalized state
- Creates client context
- Connects all sockets
- registers all event listeners
- NOTE: This function is called on every reconnect attempt
MIT