
Provide an AMQP connection as NestJS Module. Internally use amqp-connection-manager.

  • Provide an AmqpModule create AmqpConnectionManager async
  • Provide an injectable amqp connection manager at global
  • Provide decorators like @PublishQueue and @SubscribeQueue as method decorator for simple usage


yarn add nestx-amqp


Register Module Async

import { Module } from '@nestjs/common'
import { AmqpModule } from 'nestx-amqp'

  imports: [
      useFactory: () => ({
        urls: ['amqp://guest:guest@localhost:5672?heartbeat=60'],
  controllers: [],
  providers: [],
export class AppModule {}

Inject AmqpConnectionManager

Use Symbol AMQP_CONNECTION for Injection:

Below is an abstract producer code sample.

import { Inject, OnModuleInit } from '@nestjs/common'
import { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'
import { Options } from 'amqplib'
import { AMQP_CONNECTION } from 'nestx-amqp'

export abstract class SimpleAbstractProducer implements OnModuleInit {
  channel: ChannelWrapper;

  abstract getQueue(): string;
  abstract getQueueOptions(): Options.AssertQueue;

    readonly connectionManager: AmqpConnectionManager
  ) {}

  async onModuleInit() {
    this.channel = this.connectionManager.createChannel({
      json: true,
      setup: (channel) => channel.assertQueue(this.queue),
    await this.channel.waitForConnect();

  async send(message, options?: Options.Publish) {
    await this.channel.sendToQueue(this.queue, message, options);

Advanced Usage with Decorators

Currently, only support direct queue publish and subscribe

Interface Queue

export interface Queue {
  name: string;
  options?: Options.AssertQueue;

export interface RetryOptions {
  maxAttempts: number;

export interface BaseConsumeOptions {
  prefetch: number;
  exceptionQueue?: string;

export type PublishQueueOptions = Options.Publish;
export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions> & Options.Consume;


Provide a MethodDecorator easily publishing message to queue


@PublishQueue(queue: string | Queue, options?: amqplib.Options.Publish)
yourPublishQueueMethod(content:any, options?: amqplib.Options.Publish){}


(You must register and enable AmqpModule)

class TestMessageService {
  queue = 'TEST.QUEUE';

  async testPublishQueue(content) {
    console.log(`call test publish queue with ${JSON.stringify(content)}`);


Provide a MethodDecorator easily consuming message and support simply requeue logic


@SubscribeQueue(nameOrQueue: string | Queue, options?: ConsumeQueueOptions)


You must register and enable AmqpModule

class TestMessageService {
  queue = 'TEST.QUEUE';

  async testSubscribeQueue(content) {
    // do your business handling code
    // save db? send email?
    console.log(`handling content ${JSON.stringify(content)}`);

Interface Exchange

import { Options } from 'amqplib'

 * @desc simply wrap amqp exchange definitions as interface
 * */
export interface Exchange {
  name: string
  type: string | 'direct' | 'fanout' | 'topic' | 'headers'
  options?: Options.AssertExchange

 * @desc wrap amqp.Channel.publish(exchange: string, routingKey: string, content, options?: Publish): boolean
 *       as interface
 * */
export interface PublishExchangeOptions {
  routingKey: string
  options?: Options.Publish


Not Stable

Provide a MethodDecorator easily publishing message to exchange


@PublishExchange(exchange: string | Exchange, options?: PublishExchangeOptions)
yourPublishExchangeMethod(content:any, options?: PublishExchangeOptions){}


No Example for stable usage, you can refer to unit test case (or submit PR)


Provide a MethodDecorator easily spec connection (when you register AmqpModule) with @PublisQueue() and @SubscribeQueue)

Recommend if you want to develop npm package using spec named connection


class AmqpLoggerService {
  queue = 'LOGGER.QUEUE'

  async logSideEffect(content) {
    // just do nothing here and auto send to LOGGER.QUEUE with spec `logger` connection

for more details, you can refer unittest cases.

