Internal error occurred during message handling. Please check your implementation. Error: The receiver for "xxx/xxx/xxx" is already receiving messages.
paulbijancoch opened this issue · 3 comments
paulbijancoch commented
Dear @abdomohamed first of all thank you for open sourcing this.
Unfortunately I get an error when having more than one subscription.
My implementation is
import {
ServiceBusPubSub,
IServiceBusOptions,
} from '@talema/graphql-azure-servicebus-subscriptions';
const options: IServiceBusOptions = {
connectionString: process.env.SERVICEBUS_CONNECTION_STRING,
topicName: process.env.SERVICEBUS_TOPIC,
subscriptionName: process.env.SERVICEBUS_SUBSCRIPTION_NAME,
filterEnabled: false,
messageLabelKeyName: null
};
export const pubsub = new ServiceBusPubSub(options);
Do you have any idea what's going wrong there? Or did you even came across this error before?
The full error log is this:
Internal error occurred during message handling. Please check your implementation. Error: The receiver for "my-topic/Subscriptions/my-subscription" is already receiving messages.
at ServiceBusReceiverImpl._throwIfAlreadyReceiving (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@azure/service-bus/src/receivers/receiver.ts:326:21)
at ServiceBusReceiverImpl.subscribe (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@azure/service-bus/src/receivers/receiver.ts:504:10)
at ServiceBusPubSub.subscribe (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@talema/graphql-azure-servicebus-subscriptions/src/ServiceBusPubSub.ts:169:35)
at /Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:113:32
at Array.map (<anonymous>)
at PubSubAsyncIterator.subscribeAll (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:112:41)
at PubSubAsyncIterator.<anonymous> (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:55:65)
at step (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:33:23)
at Object.next (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:14:53)
at /Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:8:71
Internal error occurred during message handling. Please check your implementation. Error: The receiver for "my-topic/Subscriptions/microservice-gateway-ws" is already receiving messages.
at ServiceBusReceiverImpl._throwIfAlreadyReceiving (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@azure/service-bus/src/receivers/receiver.ts:326:21)
at ServiceBusReceiverImpl.subscribe (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@azure/service-bus/src/receivers/receiver.ts:504:10)
at ServiceBusPubSub.subscribe (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/@talema/graphql-azure-servicebus-subscriptions/src/ServiceBusPubSub.ts:169:35)
at /Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:113:32
at Array.map (<anonymous>)
at PubSubAsyncIterator.subscribeAll (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:112:41)
at PubSubAsyncIterator.<anonymous> (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/src/pubsub-async-iterator.ts:55:65)
at step (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:33:23)
at Object.next (/Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:14:53)
at /Users/reventwork/Code/microservice-playground/microservice-gateway-ws/node_modules/graphql-subscriptions/dist/pubsub-async-iterator.js:8:71
I'd be happy to extend the project with you.
Best regards
paulbijancoch commented
If it might help. I changed the code to:
import {
ProcessErrorArgs,
ServiceBusClient,
ServiceBusMessage,
ServiceBusReceivedMessage,
ServiceBusReceiver,
ServiceBusSender,
delay,
isServiceBusError,
} from '@azure/service-bus';
import { Subject, Subscription, filter, tap, map } from 'rxjs';
import { PubSubEngine } from 'graphql-subscriptions';
import { IEventBody, IEvent, IEventResult } from '@reventwork.com/streaming';
/**
* Represents configuration needed to wire up PubSub engine with the ServiceBus topic
* @property {string} connectionString - The ServiceBus connection string. This would be the Shared Access Policy connection string.
* @property {string} topicName - This would be the topic where all the events will be published.
* @property {string} subscriptionName - This would be the ServiceBus topic subscription name.
* @property {boolean} filterEnabled - This would be used to enable a channeling concept for the connected clients, so they would receive only the events they've asked to subscribe to.
* All the published events will be pushed to the same configured topic, This means all clients would be receive all the events unless this field set as false.
* To enable message channeling when the message get published, the ServiceBusPubSub would applicationAttribute called eventName, this would be the eventName from the publish method.
*/
export interface IServiceBusOptions {
connectionString: string;
topicName: string;
subscriptionName: string;
}
/**
* An override for the in-memory PubSubEngine which connects to the Azure ServiceBus.
*/
export class ServiceBusPubSub extends PubSubEngine {
publish(eventName: string, payload: any): Promise<void> {
throw new Error('Method not implemented.');
}
private client: ServiceBusClient;
private handlerMap = new Map<number, Subscription>();
private options: IServiceBusOptions;
private eventNameKey: string = 'sub.eventName';
private subject: Subject<IEventResult>;
constructor(options: IServiceBusOptions, client?: ServiceBusClient) {
super();
this.options = options;
this.client = client || new ServiceBusClient(this.options.connectionString);
this.subject = new Subject<IEventResult>();
const subscription = this.client
.createReceiver(this.options.topicName, this.options.subscriptionName)
.subscribe({
processMessage: async (message: ServiceBusReceivedMessage) => {
this.subject.next({
...message,
});
},
processError: async (args) => {
console.log(
`Error from source ${args.errorSource} occurred: `,
args.error
);
if (isServiceBusError(args.error)) {
switch (args.error.code) {
case 'MessagingEntityDisabled':
case 'MessagingEntityNotFound':
case 'UnauthorizedAccess':
console.log(
`An unrecoverable error occurred. Stopping processing. ${args.error.code}`,
args.error
);
await subscription.close();
break;
case 'MessageLockLost':
console.log(`Message lock lost for message`, args.error);
break;
case 'ServiceBusy':
await delay(1000);
break;
}
}
},
});
}
/**
* Subscribe to a specific event updates. The subscribe method would create a ServiceBusReceiver to listen to all the published events.
* The method internally would filter out all the received events that are not meant for this subscriber.
* @property {eventName | string} - published event name
* @property {onMessage | Function} - client handler for processing received events.
* @returns {Promise<number>} - returns the created identifier for the created subscription. It would be used to dispose/close any resources while unsubscribing.
*/
async subscribe(eventName: string, onMessage: Function): Promise<number> {
const id = Date.now() * Math.random();
this.handlerMap.set(
id,
this.subject
.pipe(
filter(
(e) =>
(eventName && e.body.name === eventName) ||
!eventName ||
eventName === '*'
),
map((e) => e.body),
tap((e) => e)
)
.subscribe((event) => onMessage(event))
);
return id;
}
/**
* Unsubscribe method would close open connection with the ServiceBus for a specific event handler.
* @property {subId} - It's a unique identifier for each subscribed client.
*/
async unsubscribe(subId: number) {
const info = this.handlerMap.get(subId) || undefined;
if (info === undefined) return;
info.unsubscribe();
this.handlerMap.delete(subId);
}
}
but beware: publish is not filled out.
abdomohamed commented
Hey @paulbijancoch, for some reason, I just saw this message. Sure, I'd appreciate any help :) can you please send a PR with the change you're proposing, so it's easier for me to follow your proposal?
paulbijancoch commented
@abdomohamed take a look at #3