horse-framework/horse-messaging

How to create a unique queue for each unique client

DoubleDBE opened this issue · 11 comments

Hello,

Is it possible to have a "main" queue that will route each message to a "sub-queue"?
This "sub-queue" will be used for every unique client that will be connected to my application.

Why?
I need to keep track of all messages that were sent, and which client has received them or not.

The scenario:
My application will produce its own messages. These messages needs to be sent to each connected client.

Even when a client has been offline for a while (battery drained, lost wifi connection, etc..), each client needs to receive every message that my client has produced.

Thank you for the information!

Hello,

You can do it with routers. Create a router with name "foo" with Distribute method. Distribute means, each message is sent to all bindings.

Then you need to add Topic Binding to your router.
Topic binding redirects the message to the queues with specified topic name (queues can have topic name). Set topic binding's method Distribute. That means, each message is sent to all queues with specified topic name. If you want to get acknowledge from queue to producer, you can set Interaction property of binding "Response".

After that, you need to publish your messages instead of push.
Instead of client.Queue.Push, use client.Router.Publish.. methods.
The target should be "foo" which is the name of the router.

Do not forget, if some queues are created lately, old messages will not be redirected to your new queues. An example,

Queue 1 is created with topic "x"
Queue 2 is created with topic "x"
Message 1 is produced
Queue 1 and 2 received Message 1
Queue 3 is created with topic "x"
Message 2 is produced
Queue 1, 2 and 3 received the Message 2 (Queue 3 can't receive Message 1)

Server side example code:

//rider is type of HorseRider

IRouter router = rider.Router.Add("foo", RouteMethod.Distribute);
router.AddBinding(new TopicBinding
{
   Name = "identifier-name",
   RouteMethod = RouteMethod.Distribute,
   Interaction = BindingInteraction.Response,
   Target = "x"
});

Client side example code:

await client.Router.Create("foo", RouteMethod.Distribute);
await client.Router.AddBinding("foo", "Topic", "identifier-name", "x", BindingInteraction.Response, RouteMethod.Distribute);

Or you can just use Horse.Jockey administration panel and create your router and and bindings visually.

Do not forget, if some queues are created lately, old messages will not be redirected to your new queues. An example,

Queue 1 is created with topic "x" Queue 2 is created with topic "x" Message 1 is produced Queue 1 and 2 received Message 1 Queue 3 is created with topic "x" Message 2 is produced Queue 1, 2 and 3 received the Message 2 (Queue 3 can't receive Message 1)

Is this possible to overcome by creating a fully custom QueueManager?

Or is there some other way by customization something else that this framework provides?

Thanks!

Yes, it's possible.

  • You should implement IHorseQueueManager interface.
  • You need some IQueueMessageStore implementation for storing messages.
    There are some message stores in horse, you can use them, inherit your custom stores from them (their methods are mostly virtual) or you can can your store from strach by implementing the IQueueMessageStore interface.
  • For clustering operations you need IQueueSynchronizer implementation. I strongly recommend to use horse's default queue synchorizer unless you are storing your messages in different db such as sql server. It's name is DefaultQueueSynchronizer.

You can check Horse.Messaging.Server.Queues.Managers.MemoryQueueManager and Horse.Messaging.Data.Implementation.PersistentQueueManager classes as example.

And it's possible to your multiple queue managers in same server. Example server code.

HorseRider rider = HorseRiderBuilder.Create()
    .ConfigureQueues(cfg =>
    {
        cfg.Options.Type = QueueType.Push;
        cfg.EventHandlers.Add(new QueueEventHandler());

        cfg.UseMemoryQueues(c =>
        {
            c.Options.CommitWhen = CommitWhen.AfterReceived;
            c.Options.Acknowledge = QueueAckDecision.WaitForAcknowledge;
            c.Options.PutBack = PutBackDecision.Regular;
        });

        cfg.UsePersistentQueues(null, c =>
        {
            c.Options.Acknowledge = QueueAckDecision.WaitForAcknowledge;
            c.Options.CommitWhen = CommitWhen.AfterReceived;
        });
        
        cfg.UseCustomQueueManager("custom", builder =>
        {
            //you can use builder variable to get some useful information
            return new YourCustomerManager();
        });
    })
    .Build();

To use your customer queue manager add [QueueManager("custom")] attribute to your models in producer or consumer.
Or you can just use only custom queue manager. In that case you do not need to use that attribute.

If you have multiple queue managers. default names for memory and persistent queue managers, Default for memory and Persistent for persistent queues.

Okay thank you, will try!

Quick question, this piece of code where you put "Topic", is that a predefined value that is recognized somewhere in the framework, or is this something that I can give a random name (eg "my-custom-topic" or whatever)?

await client.Router.AddBinding("foo", "Topic", "identifier-name", "x", BindingInteraction.Response, RouteMethod.Distribute);

If its predefined, maybe it's better to create an enum for that, seems a bit clearer (just a suggestion).

Topic is an identifier name. Because there is a class with name TopicBinding in server assembly. You can type TopicBinding too instead of Topic. Both acceptable.

If you create a class in server assembly with name FooBinding and extends from Binding class.
You will able to use Foo as binding type and that word indicates your custom FooBinding.

It seems like when I use TopicBinding, the response after publish is always NotFound.

However, if I change it to a QueueBinding, the response is always Ok, but nothing really happens beyond that.

Server:

 IRouter router = rider.Router.Add("router", RouteMethod.Distribute);

 //router.AddBinding(new TopicBinding()
 //{
 // RouteMethod = RouteMethod.Distribute,
 // Interaction = BindingInteraction.Response,
 // Name = "main-router",
 // Target = "main-queue"
 //});

 router.AddBinding(new QueueBinding()
 {
 RouteMethod = RouteMethod.Distribute,
 Interaction = BindingInteraction.Response,
 Name = "main-router",
 Target = "main-queue"
 });

Client:

 await client.Router.Create("router", RouteMethod.Distribute);

 await client.Router.AddBinding("router", "TopicBinding", "main-router", "main-queue", BindingInteraction.Response, RouteMethod.Distribute);

Publish:

await client.Router.PublishJson("router", model, true);

Any ideas?

Thanks

And you have a queue with topic "main-queue" ?

you can create like this.

HorseQueue queue = await rider.Queue.Create("queue1");
queue.Topic = "main-queue";

You can also define that topic in client side (producer or consumer) with [QueueTopic("main-queue")] attribute.

By the way, you can use asterisk for topic binding. For example, you can set your topic binding's target "main*". That will find your queues with "main-queue" or "main-foo" topics too.

I have tried your suggestion, but still no success.

It seems to me something is wrong when using TopicBinding, or I'm doing something wrong.

When I change all bindings to a QueueBinding, it works.

Model (i'm using a single console application, so the model is the same in server and client):

[AutoAck]
[AutoNack]
[AutoResponse(AutoResponse.All, NegativeReason.ExceptionMessage)]
[QueueName("queue-name")]
[QueueTopic("topic-name")]
[RouterName("router-name")]
class Model
{
    public string Message { get; set; }
}

Binding creation in server:

var router = new Router(rider, "router-name", RouteMethod.Distribute);

router.AddBinding(new QueueBinding()
{
  RouteMethod = RouteMethod.Distribute,
  Interaction = BindingInteraction.Response,
  Name = "binding-name",
  Target = "queue-name"
});

//router.AddBinding(new TopicBinding()
//{
//    RouteMethod = RouteMethod.Distribute,
//    Interaction = BindingInteraction.Response,
//    Name = "binding-name",
//    Target = "queue-name"
//});

rider.Router.Add(router);

Queue creation in server:

var q = await rider.Queue.Create("queue-name");
q.Topic = "topic-name";

Binding creation in loopback client (producer):

await client.Router.Create("router-name", RouteMethod.Distribute);

await client.Router.AddBinding("router-name", nameof(QueueBinding), "binding-name", "topic-name", BindingInteraction.Response, RouteMethod.Distribute);

Publish in loopback client (producer):

var model = new Model()
{
  Message = "test"
};

var routerResult = await client.Router.PublishJson("router-name", model, true);

Binding creation in actual client that will receive the messages:

await client.Router.Create("router-name", RouteMethod.Distribute);

await client.Router.AddBinding("router-name", nameof(QueueBinding), "binding-name", "topic-name", BindingInteraction.Response, RouteMethod.Distribute);

Also, I noticed something strange.

When I change nameof(QueueBinding) to nameof(TopicBinding) in the loopback client and in the actual client, it works too.

Server binding creation:

router.AddBinding(new QueueBinding()
{
  RouteMethod = RouteMethod.Distribute,
  Interaction = BindingInteraction.Response,
  Name = "binding-name",
  Target = "queue-name"
});

Binding creation in loopback client (producer):

await client.Router.AddBinding(
        "router-name",
         nameof(TopicBinding),
        "binding-name",
        "topic-name",
         BindingInteraction.Response,
         RouteMethod.Distribute);

Binding creation in actual client that will receive the messages:

await client.Router.AddBinding(
        "router-name",
         nameof(TopicBinding),
        "binding-name",
        "topic-name",
         BindingInteraction.Response,
         RouteMethod.Distribute);

Hi,

Firstly, thanks, you found a bug in topic bindings. I just fixed it and published version 6.3.15.

In your examples, I guess you think you need to add binding in both side (client and server). Only one side is enough. If you add binding in server side, you do not need to add same binding from clients.

I saw a mistake in your code, I think there is something misunderstood. Target property of the binding does not represent name of the queue everytime. In your code:

//router.AddBinding(new TopicBinding()
//{
//    RouteMethod = RouteMethod.Distribute,
//    Interaction = BindingInteraction.Response,
//    Name = "binding-name",
//    Target = "queue-name"
//});

is pointless. Because it's topic binding. It finds your queues by topics. But the target is "queue-name". There is no queue with topic "queue-name". It's the name of the queue, not topic.

In your last post,

await client.Router.AddBinding(
        "router-name",
         nameof(TopicBinding),
        "binding-name",
        "topic-name",
         BindingInteraction.Response,
         RouteMethod.Distribute);

that code should work, after bug fix.
But in your last post there are multiple bindings.
Message is received because of QueueBinding not TopicBinding.

Hi there,

thank you for the fix, it works!