/nestx-amqp

Provide an AMQP connection as NestJS Module. Internally use amqp-connection-manager.

Primary LanguageTypeScriptMIT LicenseMIT

nestx-amqp

NPM Github Workflow Status Codecov Semantic-Release

Provide an AMQP connection as NestJS Module. Internally use amqp-connection-manager.

Features

  • Provide an AMQPModule create AMQPConnectionManager async
  • Provide an injectable amqp connection manager at global

Installation

yarn add nestx-amqp

Examples

Register Module Async

import { Module } from '@nestjs/common';
import { AMQPModule } from 'nestx-amqp';


@Module({
  imports: [
    AMQPModule.forRootAsync({
      useFactory: () => ({
        urls: ['amqp://devuser:devuser@localhost:5672?heartbeat=60'],
      }),
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Inject AMQPConnectionManager

Use Symbol AMQP_CONNECTION for Injection:

Below is a abstract producer code sample.

import { Inject, OnModuleInit } from '@nestjs/common';
import { AMQP_CONNECTION } from 'nestx-amqp';
import * as amqp from 'amqp-connection-manager';
import { Options } from 'amqplib';

export abstract class SimpleAbstractProducer implements OnModuleInit {
  channelWrapper: amqp.ChannelWrapper;

  abstract getQueue(): string;
  abstract getQueueOptions(): Options.AssertQueue;

  public constructor(
    @Inject(AMQP_CONNECTION)
    readonly connectionManager: amqp.AmqpConnectionManager
  ) {}

  async onModuleInit() {
    this.channelWrapper = this.connectionManager.createChannel({
      json: true,
      setup: (channel) => {
        return channel.assertQueue(this.queue);
      }
    });
    await this.channelWrapper.waitForConnect();
  }

  async send(message, options?: Options.Publish) {
    await this.channelWrapper.sendToQueue(this.queue, message, options);
  }
}