by @foundernest
Simple decorator-based wrapper for easy RabbitMQ queuing.
- Built-in connection recovery to queue.
- Decorator to register queue listeners.
- Send/Receive messages through the same connection.
- Ack and messages retries already set.
- Automatic creation of queues.
- JSON serialization/deserialization of messages.
- Messages priority.
- Concurrency control.
- Full Typescript support.
- Queue ACK for messages sent.
Both JavaScript and Typescript can be used, however, this library takes advantage of Typescript decorators. OnQueue
decorator is only available using Typescript.
A running instance of RabbitMQ is needed, a docker-compose file is provided to use along with this library in a dev env (
docker-compose up -d rabbitmq
). Do not use the given image in production
Init the service:
import * as SimpleQueueDecorator from 'simple-queue-decorator'
await SimpleQueueDecorator.init({
url: "127.0.0.1",
user: "guest",
password: "guest"
})
await SimpleQueueDecorator.close(); // Closes the service
Consume Messages (this can be done before init
):
import { OnQueue } from 'simple-queue-decorator'
class MyConsumer {
@OnQueue('my-queue')
public static async onMessageReceived(msg: any) {
console.log("Message Received", msg.foo)
await doSomethingWithMyMsg(msg) // If this returns a rejected promise, message will be re-queued once
}
}
It is recommended to use static methods with queue decorator
Send Messages (Service must be initiated beforehand):
import * as SimpleQueueDecorator from 'simple-queue-decorator'
SimpleQueueDecorator.sendMessage('my-queue', {foo: "my message name"})
Messages can also be listened without using the decorator:
import * as SimpleQueueDecorator from 'simple-queue-decorator'
SimpleQueueDecorator.registerQueue("my-queue", async (msg) => {
console.log("Message Received", msg.foo)
await doSomethingWithMyMsg(msg)
})
This is preferable if using JavaScript or using dynamic dependencies (e.g. injectables) in the queue callback. RegisterQueue also support an array of queues, the callback will be execute for any message in any of those queues.
Send a message with priority:
import * as SimpleQueueDecorator from 'simple-queue-decorator'
SimpleQueueDecorator.sendMessage('my-queue',
{foo: "my message name"},
{
priority: SimpleQueueDecorator.MessagePriority.HIGH
});
Messages will be ordered and received according to priority. Priority can be LOW
, MEDIUM
or HIGH
, a number ranging from 1 to 10 can also be used. Keep in mind the following:
- Messages with no priority set are considered lower than any priority (including
LOW
) - When messages are instantly consumed, order may not be guaranteed.
The following options can be passed to init
:
- url: The plain url (amqp protocol) of rabbitMQ.
- user: RabbitMQ user.
- password: RabbitMQ password.
- log: If true, will log internal queue errors, defaults to true.
- concurrency: The number of messages to be consumed at the same time, defaults to 1.
- retry: If true, 1 retry per message will be made if the callback returns a rejected promise, defaults to true.
- maxConnectionAttempts: Maximum number of recconnection attempts on
init
, if 0, it will attempt indefinitely. Defaults to 0. - connectionRetryDelay: Milliseconds to wait before connection attempts. Defaults to 5
- protocol: Specify the protocol (i.e: amqp, amqps, https, etc)
node and npm required, either docker or a running instance of rabbitmq required.
npm install
npm run tsc
to compile- (optional)
docker-compose up -d rabbitmq
to launch rabbitmq npm test
to compile and execute tests (rabbitmq must be running)
This library makes several assumptions on how the messages are going to be consumed, as such, if your needs are different, we recommend directly using amqplib.
- A single retry will be done before completely dropping a message.
- Only one listener is attached to each queue.
- A single connection to be shared between all consumers.
- Queues are created with persistence.
- Messages are JSON formatted.
- By default, messages are consumed 1 at a time.