/simple-queue-decorator

A simple interface with RabbitMQ through typescript decorators

Primary LanguageTypeScriptMIT LicenseMIT

Simple Queue Decorator

by @foundernest

npm Build Status

Simple decorator-based wrapper for easy RabbitMQ queuing.

Features

  • Built-in connection recovery to queue.
  • Decorator to register queue listeners.
  • Send/Receive messages through the same connection.
  • Ack and messages retries already set.
  • Automatic creation of queues.
  • JSON serialization/deserialization of messages.
  • Messages priority.
  • Concurrency control.
  • Full Typescript support.
  • Queue ACK for messages sent.

How To

Both JavaScript and Typescript can be used, however, this library takes advantage of Typescript decorators. OnQueue decorator is only available using Typescript.

A running instance of RabbitMQ is needed, a docker-compose file is provided to use along with this library in a dev env (docker-compose up -d rabbitmq). Do not use the given image in production

Init the service:

import * as SimpleQueueDecorator from 'simple-queue-decorator'

await SimpleQueueDecorator.init({
    url: "127.0.0.1",
    user: "guest",
    password: "guest"
})

await SimpleQueueDecorator.close(); // Closes the service

Consume Messages (this can be done before init):

import { OnQueue } from 'simple-queue-decorator'

class MyConsumer {

    @OnQueue('my-queue')
    public static async onMessageReceived(msg: any) {
        console.log("Message Received", msg.foo)
        await doSomethingWithMyMsg(msg) // If this returns a rejected promise, message will be re-queued once
    }
}

It is recommended to use static methods with queue decorator

Send Messages (Service must be initiated beforehand):

import * as SimpleQueueDecorator from 'simple-queue-decorator'

SimpleQueueDecorator.sendMessage('my-queue', {foo: "my message name"})

Messages can also be listened without using the decorator:

import * as SimpleQueueDecorator from 'simple-queue-decorator'


SimpleQueueDecorator.registerQueue("my-queue", async (msg) => {
    console.log("Message Received", msg.foo)
    await doSomethingWithMyMsg(msg)
})

This is preferable if using JavaScript or using dynamic dependencies (e.g. injectables) in the queue callback. RegisterQueue also support an array of queues, the callback will be execute for any message in any of those queues.

Send a message with priority:

import * as SimpleQueueDecorator from 'simple-queue-decorator'

SimpleQueueDecorator.sendMessage('my-queue',
 {foo: "my message name"},
  {
      priority: SimpleQueueDecorator.MessagePriority.HIGH
});

Messages will be ordered and received according to priority. Priority can be LOW, MEDIUM or HIGH, a number ranging from 1 to 10 can also be used. Keep in mind the following:

  • Messages with no priority set are considered lower than any priority (including LOW)
  • When messages are instantly consumed, order may not be guaranteed.

The following options can be passed to init:

  • url: The plain url (amqp protocol) of rabbitMQ.
  • user: RabbitMQ user.
  • password: RabbitMQ password.
  • log: If true, will log internal queue errors, defaults to true.
  • concurrency: The number of messages to be consumed at the same time, defaults to 1.
  • retry: If true, 1 retry per message will be made if the callback returns a rejected promise, defaults to true.
  • maxConnectionAttempts: Maximum number of recconnection attempts on init, if 0, it will attempt indefinitely. Defaults to 0.
  • connectionRetryDelay: Milliseconds to wait before connection attempts. Defaults to 5
  • protocol: Specify the protocol (i.e: amqp, amqps, https, etc)

Development steps

node and npm required, either docker or a running instance of rabbitmq required.

  1. npm install
  2. npm run tsc to compile
  3. (optional) docker-compose up -d rabbitmq to launch rabbitmq
  4. npm test to compile and execute tests (rabbitmq must be running)

Important Notes

This library makes several assumptions on how the messages are going to be consumed, as such, if your needs are different, we recommend directly using amqplib.

  • A single retry will be done before completely dropping a message.
  • Only one listener is attached to each queue.
  • A single connection to be shared between all consumers.
  • Queues are created with persistence.
  • Messages are JSON formatted.
  • By default, messages are consumed 1 at a time.