RabbitMQ
module for the Hapiness framework.
RabbitMQ is a server that implement the AMQP 0-9-1
protocol.
Getting started with AMQP concepts
The module uses amqp.node to connect to RabbitMQ and is architectured arround the channel API provided.
- How this module works
- Message and routing
- Using your module inside Hapiness application
- Contributing
- Change History
- Maintainers
- License
With this module you will be able to configure your AMQP stack easily with the way you prefer.
We provide three decorators, @Exchange
, @Queue
, @Message
that will allow you to quickly getting started.
Key | Type | Infos |
---|---|---|
connection | object | Connection Object |
Key | Type | Default | Infos |
---|---|---|---|
uri | string | undefined | other values are ignored if set |
host | string | localhost | - |
port | number | 5672 | - |
login | string | undefined | - |
password | string | undefined | - |
params | object | undefined | Parameters to include in querystring, like: { heartBeat: 30 } |
retry.delay | number | 5000 | Delay in ms to wait after trying to reconnect |
retry.maximum_attempts | number | -1 | Maximum reconnection attempts, -1 for Infinity |
default_prefetch | number | 10 | Default prefetch used when creating new channels |
This module supports only one connection at the same time.
By default the module will retry to connect after a connection error. This behaviour is configurable.
When the connection is ready the extension will find all classes with decorators and do all the work to get everything ready.
Each connection can open several channels. Every operation on RabbitMQ occurs through channels.
You can create them easily with the ChannelService
.
Exchanges needs a name and a type.
name: string
type: ExchangeType
(ExchangeType.Direct
,ExchangeType.Topic
,ExchangeType.Fanout
)options: Object
optional see exchange assert options
Queues only requires a name.
name: string
binds: Array<Bind>
optionaloptions: Object
optional see queue assert options
Each message sent on RabbitMQ is consumed by a queue.
You can decide to receive all the messages on your queue onMessage method. That's a good option if you have only one type of message arriving on it. You can also call your own dispatcher there.
It's also possible to receive plenty of different messages on the same queue. Creating one class to handle each message is then a better choice.
This module allow you to link a RabbitMessage to your custom message class. We provide a message router that will load the right message decorator class when receiving new messages. If no message class is found the onMessage method on your queue is used as a fallback. If you did not provide this method an error will be throwned.
- queue: the queue class where the message is consumed
- exchange: the exchange class
- routingKey: string or regex to match the routingKey of the message
- filter: a simple one level object with keys and values. Keys are the path on the RabbitMQ message and values could be a string, number, boolean or RegExp.
$ npm install --save @hapiness/core @hapiness/rabbitmq rxjs
or
$ yarn add @hapiness/core @hapiness/rabbitmq rxjs
"dependencies": {
"@hapiness/core": "^1.3.0",
"@hapiness/rabbitmq": "^1.2.3",
"rxjs": "^5.5.6",
//...
}
//...
This module provide an Hapiness extension for RabbitMQ.
To use it, simply register it during the bootstrap
step of your project and provide the RabbitMQExt
with its config
import { RabbitMQExt } from '@hapiness/rabbitmq';
@HapinessModule({
version: '1.0.0',
providers: [],
declarations: [],
imports: [RabbitMQModule]
})
class MyApp implements OnStart {
constructor() {}
onStart() {}
}
Hapiness
.bootstrap(
MyApp,
[
/* ... */
RabbitMQExt.setConfig(
{
connection: {
host: 'localhost',
port: 5276,
vhost: 'my_vhost',
login: 'xxx',
password: 'xxxx'
}
}
)
]
)
.catch(err => {
/* ... */
});
@Exchange({
name: 'user.exchange',
type: ExchangeType.Topic,
// See options available: http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
options: {
durable: true,
autoDelete: false
}
})
export class UserExchange implements ExchangeInterface {}
@Queue({
name: 'user.queue',
// See options available: http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
options: {
durable: true
},
binds: [{
exchange: UserExchange,
pattern: 'user.*'
}]
})
export class UserQueue implements QueueInterface {
// Inject your services
constructor(private _myService; MyService) {}
// Optional
// Do some action when the queue is asserted
onAsserted() {
this._myService.foo();
}
// When a message is consumed it will arrives here if no message class has been found
// by the router
onMessage(message: RabbitMessage, ch: ChannelInterface) {
return Observable.of({ ack: true });
}
}
@Message({
queue: UserQueue,
exchange: UserExchange,
routingKey: 'user.edited'
})
export class UserCreatedMessage implements MessageInterface {
constructor(private _myService: MyService) {
super();
}
// Executed when a message is consumed and dispatched here
onMessage(message: RabbitMessage, ch: ChannelInterface): Observable<MessageResult> {
this._myService.foo();
// You can return an object to let the consumer know what to do with your message:
// acknowleding, rejecting it or do nothing
return Observable.of({ ack: true });
}
}
This configuration will create:
- One exchange of type
topic
nameduser.exchange
. - One durable queue named
user.queue
- It will bind this queue to the previously created exchange with the routingKey
user.*
- It will bind this queue to the previously created exchange with the routingKey
- It will dispatch all messages which are sent to the exchange and have the routingKey
user.edited
consumed by the previously created queue to the new message we created. - All other messages sent to the exchange with a routingKey matching the pattern
user.*
or sent directly to the queue will be consumed by theonMessage()
method defined in the queue.
You need to include RabbitMQModule
in imports and all your decorated classes in declarations.
@HapinessModule({
version: '1.0.0',
declarations: [
MyQueue,
MyExchange,
MyMessage,
...
],
providers: [
MyService
],
exports: [],
imports: [RabbitMQModule]
})
You need to inject the extension in bootstrap using setConfig to instantiate the module.
Hapiness.bootstrap(RabbitMQModuleTest, [
RabbitMQExt.setConfig({
connection: {
host: '....',
login: '....',
password: '....'
}
})
]).catch(err => done(err));
Once the extension is loaded and RabbitMQ
is connected you can use the services in your app.
We provide two services:
ConnectionService
, ChannelService
, MessageService
To send messages you can also use the sendMessage() utility provided.
class FooProvider {
constructor(private _channelService: ChannelService, private _messageService: MessageService) {}
bar(): Observable<ChannelManager> {
// Upsert a channel by specifying a key to identify it
// one key per channel.
// The function returns a Observable of ChannelManager instance
this._channelService.upsert('publish')
.subscribe(channelManager => {
this._myChannelManager = channelManager;
});
}
foo() {
// Use the created channel
// Use the manager to retrieve the channel instance
const ch = this._myChannelManager.getChannel();
// ... or retrieve it with the shortcut getChannel and your key
const ch = this._channelService.getChannel('publish');
// Use any function from amqp.node
ch.sendToQueue(...);
this.sendToQueue(ch, { foo: 'bar' }, UserQueue);
this.publish(ch, { foo: 'bar' }, UserExchange, { routingKey: 'foo.bar' });
}
}
To set up your development environment:
- clone the repo to your workspace,
- in the shell
cd
to the main folder, - hit
npm or yarn install
, - run
npm or yarn run test
.- It will lint the code and execute all tests.
- The test coverage report can be viewed from
./coverage/lcov-report/index.html
.
- v1.7.3 (2019-12-16)
- v1.7.2 were not correctly published, nothing new in this version
- v1.7.2 (2019-12-16)
- Handle all errors when sending a message
- Fix scope of "this" when sending message
- v1.7.1 (2019-12-13)
- Handle channel closed error when sending a message to add a custom code on the thrown error
- v1.7.0 (2019-02-27)
- Add method to cancel consuming queue
- Refactor consume queue to allow easier consume/cancel
- Add a QueueStore to fetch all the queues manager instances
- v1.6.2 (2018-11-22)
- Create DI with providers for queues and exchanges
- v1.6.1 (2018-11-14)
- force_json_decode is now true by default
- v1.6.0 (2018-10-31)
- Add assert option in Exchange and Queue decorator to allow to disable assert during bootstrap
- Add check option in Exchange and Queue decorator to verify existence during bootstrap
- v1.5.1 (2018-09-24)
- Fix reconnection error: use once instad of on and rebind event correctly
- v1.5.0 (2018-08-24)
- Add possibility to provide a custom MessageRouter
- v1.4.3 (2018-08-20)
- Emit RETRY_LIMIT_EXCEEDED error on ConnectionManager
- v1.4.2 (2018-06-11)
- Do not retry to connect if closing server
- v1.4.1 (2018-05-31)
- Fix channel creation after reconnection
- v1.4.0 (2018-04-24)
- Refactor channel management to handle connection errors
- v1.3.0 (2018-03-27)
- Add shutdown (SIGTERM/SIGINT) support
- v1.2.3 (2018-02-05)
- Latest packages' versions.
- Fix typings
- Documentation.
- v1.2.2 (2017-12-20)
- Latest packages' versions.
- Fix queue dispatching in routing messages
- Documentation.
- v1.2.1 (2017-11-23)
- Latest packages' versions.
- Fix routing messages
- Documentation.
- v1.2.0 (2017-11-20)
- Latest packages' versions.
- Update Module + Tests related to latest
core
version. - Documentation.
- Change packaging process.
- v1.1.2 (2017-11-02)
- Fix decorators prefetch
- v1.1.1 (2017-10-31)
- Fix queue binding
- v1.1.0 (2017-10-27)
- Allow to define queue binds without pattern
- Allow to define queue bind pattern as array
- Add default prefetch that is used for each channel creation if not specified in create() method first argument
- Rename decodeContent to decodeJSONContent and change logic to not throw if content is not JSON, add force argument to try to decode if headers.json boolean is missing
- Add force_json_decode option in queue decorator to force JSON decoding of all messages consumed
- Rework dispatcher logic (1)
- Add channel option for queue to allow using different channel for each queue with a different prefetch
- Export a global event object for connection and queueManager events
- Correct logic behind message routing
- Add checks and throw if messages do not have all required properties
- If the message has a filter property and it does not match discard the class from the selection
- Update tests
- Update documentation
- v1.0.0 (2017-10-23)
- Publish all features of the module
- Tests
- Documentation
Julien Fauville | Antoine Gomez | SĂ©bastien Ritz | Nicolas Jessel |
Copyright (c) 2017 Hapiness Licensed under the MIT license.