abdomohamed/graphql-azure-servicebus-subscriptions

Internal error occurred during message handling. Please check your implementation. Error: The receiver for "xxx/xxx/xxx" is already receiving messages.

paulbijancoch opened this issue · 3 comments

Dear @abdomohamed first of all thank you for open sourcing this.

Unfortunately I get an error when having more than one subscription.

My implementation is

import {
  ServiceBusPubSub,
  IServiceBusOptions,
} from '@talema/graphql-azure-servicebus-subscriptions';

const options: IServiceBusOptions = {
  connectionString: process.env.SERVICEBUS_CONNECTION_STRING,
  topicName: process.env.SERVICEBUS_TOPIC,
  subscriptionName: process.env.SERVICEBUS_SUBSCRIPTION_NAME,
  filterEnabled: false,
  messageLabelKeyName: null
};
export const pubsub = new ServiceBusPubSub(options);

Do you have any idea what's going wrong there? Or did you even came across this error before?

The full error log is this:

Internal error occurred during message handling. Please check your implementation. Error: The receiver for "my-topic/Subscriptions/my-subscription" is already receiving messages.
    at ServiceBusReceiverImpl._throwIfAlreadyReceiving (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@azure/service-bus/src/receivers/receiver.ts:326:21)
    at ServiceBusReceiverImpl.subscribe (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@azure/service-bus/src/receivers/receiver.ts:504:10)
    at ServiceBusPubSub.subscribe (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@talema/graphql-azure-servicebus-subscriptions/src/ServiceBusPubSub.ts:169:35)
    at /Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:113:32
    at Array.map (<anonymous>)
    at PubSubAsyncIterator.subscribeAll (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:112:41)
    at PubSubAsyncIterator.<anonymous> (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:55:65)
    at step (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:33:23)
    at Object.next (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:14:53)
    at /Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:8:71
Internal error occurred during message handling. Please check your implementation. Error: The receiver for "my-topic/Subscriptions/microservice-gateway-ws" is already receiving messages.
    at ServiceBusReceiverImpl._throwIfAlreadyReceiving (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@azure/service-bus/src/receivers/receiver.ts:326:21)
    at ServiceBusReceiverImpl.subscribe (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@azure/service-bus/src/receivers/receiver.ts:504:10)
    at ServiceBusPubSub.subscribe (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@talema/graphql-azure-servicebus-subscriptions/src/ServiceBusPubSub.ts:169:35)
    at /Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:113:32
    at Array.map (<anonymous>)
    at PubSubAsyncIterator.subscribeAll (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:112:41)
    at PubSubAsyncIterator.<anonymous> (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:55:65)
    at step (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:33:23)
    at Object.next (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:14:53)
    at /Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:8:71

I'd be happy to extend the project with you.

Best regards

If it might help. I changed the code to:

import {
  ProcessErrorArgs,
  ServiceBusClient,
  ServiceBusMessage,
  ServiceBusReceivedMessage,
  ServiceBusReceiver,
  ServiceBusSender,
  delay,
  isServiceBusError,
} from '@azure/service-bus';
import { Subject, Subscription, filter, tap, map } from 'rxjs';
import { PubSubEngine } from 'graphql-subscriptions';
import { IEventBody, IEvent, IEventResult } from '@reventwork.com/streaming';

/**
 * Represents configuration needed to wire up PubSub engine with the ServiceBus topic
 * @property {string} connectionString - The ServiceBus connection string. This would be the Shared Access Policy connection string.
 * @property {string} topicName - This would be the topic where all the events will be published.
 * @property {string} subscriptionName - This would be the ServiceBus topic subscription name.
 * @property {boolean} filterEnabled - This would be used to enable a channeling concept for the connected clients, so they would receive only the events they've asked to subscribe to.
 *                                     All the published events will be pushed to the same configured topic, This means all clients would be receive all the events unless this field set as false.
 *                                     To enable message channeling when the message get published, the ServiceBusPubSub would applicationAttribute called eventName, this would be the eventName from the publish method.
 */
export interface IServiceBusOptions {
  connectionString: string;
  topicName: string;
  subscriptionName: string;
}

/**
 * An override for the in-memory PubSubEngine which connects to the Azure ServiceBus.
 */

export class ServiceBusPubSub extends PubSubEngine {
  publish(eventName: string, payload: any): Promise<void> {
    throw new Error('Method not implemented.');
  }

  private client: ServiceBusClient;
  private handlerMap = new Map<number, Subscription>();
  private options: IServiceBusOptions;
  private eventNameKey: string = 'sub.eventName';
  private subject: Subject<IEventResult>;

  constructor(options: IServiceBusOptions, client?: ServiceBusClient) {
    super();
    this.options = options;
    this.client = client || new ServiceBusClient(this.options.connectionString);

    this.subject = new Subject<IEventResult>();

    const subscription = this.client
      .createReceiver(this.options.topicName, this.options.subscriptionName)
      .subscribe({
        processMessage: async (message: ServiceBusReceivedMessage) => {
          this.subject.next({
            ...message,
          });
        },
        processError: async (args) => {
          console.log(
            `Error from source ${args.errorSource} occurred: `,
            args.error
          );

          if (isServiceBusError(args.error)) {
            switch (args.error.code) {
              case 'MessagingEntityDisabled':
              case 'MessagingEntityNotFound':
              case 'UnauthorizedAccess':
                console.log(
                  `An unrecoverable error occurred. Stopping processing. ${args.error.code}`,
                  args.error
                );
                await subscription.close();
                break;
              case 'MessageLockLost':
                console.log(`Message lock lost for message`, args.error);
                break;
              case 'ServiceBusy':
                await delay(1000);
                break;
            }
          }
        },
      });
  }

  /**
   * Subscribe to a specific event updates. The subscribe method would create a ServiceBusReceiver to listen to all the published events.
   * The method internally would filter out all the received events that are not meant for this subscriber.
   * @property {eventName | string} - published event name
   * @property {onMessage | Function} - client handler for processing received events.
   * @returns {Promise<number>} - returns the created identifier for the created subscription. It would be used to dispose/close any resources while unsubscribing.
   */
  async subscribe(eventName: string, onMessage: Function): Promise<number> {
    const id = Date.now() * Math.random();
    this.handlerMap.set(
      id,
      this.subject
        .pipe(
          filter(
            (e) =>
              (eventName && e.body.name === eventName) ||
              !eventName ||
              eventName === '*'
          ),
          map((e) => e.body),
          tap((e) => e)
        )
        .subscribe((event) => onMessage(event))
    );
    return id;
  }

  /**
   * Unsubscribe method would close open connection with the ServiceBus for a specific event handler.
   * @property {subId} - It's a unique identifier for each subscribed client.
   */
  async unsubscribe(subId: number) {
    const info = this.handlerMap.get(subId) || undefined;
    if (info === undefined) return;
    info.unsubscribe();
    this.handlerMap.delete(subId);
  }
}

but beware: publish is not filled out.

Hey @paulbijancoch, for some reason, I just saw this message. Sure, I'd appreciate any help :) can you please send a PR with the change you're proposing, so it's easier for me to follow your proposal?

@abdomohamed take a look at #3