mxriverlynn/rabbus

Routing is broken

shurik239 opened this issue · 6 comments

Sorry for the long post.
Something very strange.

I make two subscribers with 2 bindings:

Add Binding pub-sub.exchange pub-sub.queue1 pub-sub.key1
Add Binding pub-sub.exchange pub-sub.queue2 pub-sub.key2

I make only one publisher, that publishes message to exchange pub-sub.exchange with routing key pub-sub.key1

Target status: only subscriber1 consumes the message once.

Actual status: both subscribers consume the message. Twice!

Sample code here:

var util = require("util");
var Rabbus = require("rabbus");
var wascally = require("wascally");

wascally
    .configure({     connection: {
        "server": "rabbit",
        "vhost": "/",
        "user": "guest",
        "pass": "guest"
    } })
    .then(function(){

        // Publisher
        function SomePublisher(){
            Rabbus.Publisher.call(this, wascally, {
                exchange: "pub-sub.exchange",
                routingKey: "pub-sub.key1",
                messageType: "pub-sub.messageType"
            });
        }
        util.inherits(SomePublisher, Rabbus.Publisher);
        var publisher = new SomePublisher();

        // Subscriber 1
        function SomeSubscriber1(){
            Rabbus.Subscriber.call(this, wascally, {
                exchange: "pub-sub.exchange",
                queue: "pub-sub.queue1",
                routingKey: "pub-sub.key1",
                messageType: "pub-sub.messageType"
            });
        }
        util.inherits(SomeSubscriber1, Rabbus.Subscriber);
        var sub1 = new SomeSubscriber1();
        sub1.subscribe(function(message, properties, actions, next){
            console.log("hello ", message.place);
            actions.ack();
        });

        // Subscriber 2
        function SomeSubscriber2(){
            Rabbus.Subscriber.call(this, wascally, {
                exchange: "pub-sub.exchange",
                queue: "pub-sub.queue2",
                routingKey: "pub-sub.key2",
                messageType: "pub-sub.messageType"
            });
        }
        util.inherits(SomeSubscriber2, Rabbus.Subscriber);
        var sub2 = new SomeSubscriber2();
        sub2.subscribe(function(message, properties, actions, next){
            console.log("I should not be here, because no pub-sub.key2 was emitted");
            actions.ack();
        });

        // Publish
        setTimeout(function(){
            var message = {
                place: "world"
            };

            publisher.publish(message, function(){
                console.log("published a message");
            });
        }, 1000);

    })
    .catch(function(err){
        setImmediate(function(){ throw err; });
    });

Output:

2016-02-02T13:54:41.085Z [rabbus.consumer] Declaring Queue 'pub-sub.queue1'
2016-02-02T13:54:41.087Z [rabbus.consumer] With Queue Options
2016-02-02T13:54:41.088Z [rabbus.consumer] [{"name":"pub-sub.queue1","autoDelete":false,"durable":true,"noBatch":false,"subscribe":false}]
2016-02-02T13:54:41.091Z [rabbus.consumer] Declaring Exchange 'pub-sub.exchange'
2016-02-02T13:54:41.091Z [rabbus.consumer] With Exchange Options
2016-02-02T13:54:41.091Z [rabbus.consumer] [{"name":"pub-sub.exchange","type":"fanout","autoDelete":false,"durable":true,"persistent":true}]
2016-02-02T13:54:41.099Z [rabbus.consumer] Declaring Queue 'pub-sub.queue2'
2016-02-02T13:54:41.099Z [rabbus.consumer] With Queue Options
2016-02-02T13:54:41.099Z [rabbus.consumer] [{"name":"pub-sub.queue2","autoDelete":false,"durable":true,"noBatch":false,"subscribe":false}]
2016-02-02T13:54:41.101Z [rabbus.consumer] Declaring Exchange 'pub-sub.exchange'
2016-02-02T13:54:41.101Z [rabbus.consumer] With Exchange Options
2016-02-02T13:54:41.102Z [rabbus.consumer] [{"name":"pub-sub.exchange","type":"fanout","autoDelete":false,"durable":true,"persistent":true}]
2016-02-02T13:54:41.125Z [rabbus.consumer] Add Binding pub-sub.exchange pub-sub.queue1 pub-sub.key1
2016-02-02T13:54:41.128Z [rabbus.consumer] Add Binding pub-sub.exchange pub-sub.queue2 pub-sub.key2
2016-02-02T13:54:41.137Z [rabbus.consumer] Listening To Queue pub-sub.queue1
2016-02-02T13:54:41.138Z [rabbus.consumer] Listening To Queue pub-sub.queue2
2016-02-02T13:54:42.102Z [rabbus.producer] Declaring exchange pub-sub.exchange
2016-02-02T13:54:42.107Z [rabbus.producer] With Exchange Options { name: 'pub-sub.exchange',
       type: 'fanout',
       autoDelete: false,
       durable: true,
       persistent: true }
 2016-02-02T13:54:42.108Z [rabbus.producer] Publishing Message, Type: 'pub-sub.messageType', With Routing Key 'pub-sub.key1'
 2016-02-02T13:54:42.108Z [rabbus.producer] With Properties
 2016-02-02T13:54:42.108Z [rabbus.producer] [{"routingKey":"pub-sub.key1","type":"pub-sub.messageType","headers":{}}]
 published a message
 hello  world
 I should not be here, because no pub-sub.key2 was emitted
 hello  world
 I should not be here, because no pub-sub.key2 was emitted

hi @shurik239 - the pub/sub pattern is intended to work this way: all queues bound to the exchange will receive a copy of the message that is published through the exchange.

see this for reference http://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html
and https://www.rabbitmq.com/tutorials/tutorial-three-python.html

the default configuration for the pub/sub objects in Rabbus is to use a fanout exchange type from RabbitMQ. Fanout ignores all routing keys, as it knows that all bound queues will receive a copy of the message.

If you wish to have a single message sent to a single consumer queue, use at the send/receive object instead of the pub/sub objects.

Thank you for clarification. Of course you are right.
But, please, be aware, that the subscribers in my sample code are called twice. That ist not desire behaviour, i guess.

And, I have sliently adjusted my sample code, so that queues are bound to the different exchanges:

Add Binding pub-sub.exchange1 pub-sub.queue1 pub-sub.key1
Add Binding pub-sub.exchange2 pub-sub.queue2 pub-sub.key2

second subscriber is still consuming message.

var util = require("util");
var Rabbus = require("rabbus");
var wascally = require("wascally");

wascally
    .configure({     connection: {
        "server": "rabbit",
        "vhost": "/",
        "user": "guest",
        "pass": "guest"
    } })
    .then(function(){

        // Publisher
        function SomePublisher(){
            Rabbus.Publisher.call(this, wascally, {
                exchange: "pub-sub.exchange1",
                routingKey: "pub-sub.key1",
                messageType: "pub-sub.messageType"
            });
        }
        util.inherits(SomePublisher, Rabbus.Publisher);
        var publisher = new SomePublisher();

        // Subscriber 1
        function SomeSubscriber1(){
            Rabbus.Subscriber.call(this, wascally, {
                exchange: "pub-sub.exchange1",
                queue: "pub-sub.queue1",
                routingKey: "pub-sub.key1",
                messageType: "pub-sub.messageType"
            });
        }
        util.inherits(SomeSubscriber1, Rabbus.Subscriber);
        var sub1 = new SomeSubscriber1();
        sub1.subscribe(function(message, properties, actions, next){
            console.log("hello ", message.place);
            actions.ack();
        });

        // Subscriber 2
        function SomeSubscriber2(){
            Rabbus.Subscriber.call(this, wascally, {
                exchange: "pub-sub.exchange2",
                queue: "pub-sub.queue2",
                routingKey: "pub-sub.key2",
                messageType: "pub-sub.messageType"
            });
        }
        util.inherits(SomeSubscriber2, Rabbus.Subscriber);
        var sub2 = new SomeSubscriber2();
        sub2.subscribe(function(message, properties, actions, next){
            console.log("I should not be here, because no pub-sub.key2 was emitted");
            actions.ack();
        });

        // Publish
        setTimeout(function(){
            var message = {
                place: "world"
            };

            publisher.publish(message, function(){
                console.log("published a message");
            });
        }, 1000);

    })
    .catch(function(err){
        setImmediate(function(){ throw err; });
    });
2016-02-02T14:51:55.669Z [rabbus.consumer] Declaring Queue 'pub-sub.queue1'
2016-02-02T14:51:55.671Z [rabbus.consumer] With Queue Options
2016-02-02T14:51:55.671Z [rabbus.consumer] [{"name":"pub-sub.queue1","autoDelete":false,"durable":true,"noBatch":false,"subscribe":false}]
2016-02-02T14:51:55.674Z [rabbus.consumer] Declaring Exchange 'pub-sub.exchange1'
2016-02-02T14:51:55.674Z [rabbus.consumer] With Exchange Options
2016-02-02T14:51:55.674Z [rabbus.consumer] [{"name":"pub-sub.exchange1","type":"fanout","autoDelete":false,"durable":true,"persistent":true}]
2016-02-02T14:51:55.682Z [rabbus.consumer] Declaring Queue 'pub-sub.queue2'
2016-02-02T14:51:55.682Z [rabbus.consumer] With Queue Options
2016-02-02T14:51:55.682Z [rabbus.consumer] [{"name":"pub-sub.queue2","autoDelete":false,"durable":true,"noBatch":false,"subscribe":false}]
2016-02-02T14:51:55.685Z [rabbus.consumer] Declaring Exchange 'pub-sub.exchange2'
2016-02-02T14:51:55.685Z [rabbus.consumer] With Exchange Options
2016-02-02T14:51:55.685Z [rabbus.consumer] [{"name":"pub-sub.exchange2","type":"fanout","autoDelete":false,"durable":true,"persistent":true}]
2016-02-02T14:51:55.718Z [rabbus.consumer] Add Binding pub-sub.exchange1 pub-sub.queue1 pub-sub.key1
2016-02-02T14:51:55.722Z [rabbus.consumer] Add Binding pub-sub.exchange2 pub-sub.queue2 pub-sub.key2
2016-02-02T14:51:55.728Z [rabbus.consumer] Listening To Queue pub-sub.queue1
2016-02-02T14:51:55.732Z [rabbus.consumer] Listening To Queue pub-sub.queue2
2016-02-02T14:51:56.690Z [rabbus.producer] Declaring exchange pub-sub.exchange1
2016-02-02T14:51:56.694Z [rabbus.producer] With Exchange Options { name: 'pub-sub.exchange1',
      type: 'fanout',
      autoDelete: false,
      durable: true,
      persistent: true }
2016-02-02T14:51:56.696Z [rabbus.producer] Publishing Message, Type: 'pub-sub.messageType', With Routing Key 'pub-sub.key1'
2016-02-02T14:51:56.696Z [rabbus.producer] With Properties
2016-02-02T14:51:56.696Z [rabbus.producer] [{"routingKey":"pub-sub.key1","type":"pub-sub.messageType","headers":{}}]
published a message
hello  world
I should not be here, because no pub-sub.key2 was emitted

that's definitely a problem.

off-hand, i would guess this is related to the cause of #15 - bad setup in the middleware, basically. i'm working on a couple of ideas to fix this already, and I'll be sure to test out the scenario you provided here.

hmm, i don't think that 15 is related after all. it looks like this is caused by the way wascally uses messageType internally.

i changed your code so that the subscribers were defined with different messageType properties, and it worked as expected - only the first subscriber received the message.

        // Subscriber 1
        function SomeSubscriber1(){
            Rabbus.Subscriber.call(this, wascally, {
                exchange: "pub-sub.exchange1",
                queue: "pub-sub.queue1",
                routingKey: "pub-sub.key1",
                messageType: "pub-sub.key1" // <=========== different messageType, here
            });
        }

        // Subscriber 2
        function SomeSubscriber2(){
            Rabbus.Subscriber.call(this, wascally, {
                exchange: "pub-sub.exchange2",
                queue: "pub-sub.queue2",
                routingKey: "pub-sub.key2",
                messageType: "pub-sub.key2"  // <======= different messageType here
            });
        }

inside of wascally, it uses a library called Postal.js to provide in-memory message handling based on the messageType provided. when both subscriber1 and subscriber2 provide the same message type within the same node process, they are both subscribed to Postal.js' internal handler for that message type.

RabbitMQ in this case is only sending the message to pub-sub.queue1... but Wascally is using the messageType to distribute the message to both consumers.

The solution I would suggest is to keep the messageType the same as the routing key. This is typically what I do. Issue #14 talks about this as well, and there will likely be a change in Rabbus to default the messageType to the same thing as the routing key (unless messageType is explicitly provided).

that means - I have to avoid to make two or more consumer with the same routing key/messageType in the same node process. Regadless of what binding was made. It's actuall not a big problem for me.

perhaps a change would be good, to have the message type default to the routing key, if no message type is supplied? that way you wouldn't have to think about the message type, most of the time.

I agree - it would be the solution to avoid this kind of issues.

Ok, I'll make that change to default the messageType and will track that change in ticket #14