horse-framework/horse-messaging

TMQ Message store Example

dialtonetech opened this issue · 8 comments

Wonderful project, huge potential well done.

Is it possible to get an example, showing how to store messages in database, and how to store messages until a consumer is active.

You need to manage it manually; saving messages, loading them when queue is ready and removing from queue. However, we have a class with name Database in Twino.MQ.Data Assembly. Database is a ilghtweight data saving object and designed to store messages with high performance.

Database class saves message into a database file (we usually use tdb file extension) as binary. Performance depends your disk, in good SSD disks, you can save 200k message per seconds. But if you want to save messages to a database such as MS-SQL or PostgreSQL, you can use use them too. Just replace database insert and delete code with your SQL database code in sample code below.

Here is an example with some comments. It's used in one of my projects. I edited it for you, removed some codes added some comments.

    public class MailQueueDeliveryHandler : IMessageDeliveryHandler
    {
        private readonly Database _database;

        public MailQueueDeliveryHandler()
        {
            _database = new Database(new DatabaseOptions
                                     {
                                         Filename = "data/mail.tdb",
                                         InstantFlush = false,
                                         AutoFlush = true,
                                         AutoShrink = true,
                                         FlushInterval = TimeSpan.FromSeconds(1),
                                         ShrinkInterval = TimeSpan.FromMinutes(15),
                                         CreateBackupOnShrink = false
                                     });
        }

        public async Task Initialize(ChannelQueue queue)
        {
            //connect to database
            await _database.Open();

            //fill messages from database into queue
            QueueFiller filler = new QueueFiller(queue);
            var dict = await _database.List();
            filler.FillMessage(dict.Values, true);
        }

        public Task<Decision> ReceivedFromProducer(ChannelQueue queue, QueueMessage message, MqClient sender)
        {
            //second parameter of decision tells to MQ Server to save messages
            //MQ Server will call SaveMessage method
            return Task.FromResult(new Decision(true, true, PutBackDecision.No, DeliveryAcknowledgeDecision.None));
        }

        public Task<Decision> BeginSend(ChannelQueue queue, QueueMessage message)
        {
            return Task.FromResult(Decision.JustAllow());
        }

        public Task<Decision> CanConsumerReceive(ChannelQueue queue, QueueMessage message, MqClient receiver)
        {
            return Task.FromResult(Decision.JustAllow());
        }

        public Task<Decision> ConsumerReceived(ChannelQueue queue, MessageDelivery delivery, MqClient receiver)
        {
            return Task.FromResult(Decision.JustAllow());
        }

        public Task<Decision> ConsumerReceiveFailed(ChannelQueue queue, MessageDelivery delivery, MqClient receiver)
        {
            return Task.FromResult(Decision.JustAllow());
        }

        public Task<Decision> EndSend(ChannelQueue queue, QueueMessage message)
        {
            return Task.FromResult(Decision.JustAllow());
        }

        public async Task<Decision> AcknowledgeReceived(ChannelQueue queue, TmqMessage acknowledgeMessage, MessageDelivery delivery, bool success)
        {
            //if successful ack is received, remove message from database
            if (success)
                await _database.Delete(delivery.Message.Message);

            return Decision.JustAllow();
        }

        public Task<Decision> MessageTimedOut(ChannelQueue queue, QueueMessage message)
        {
            return Task.FromResult(Decision.JustAllow());
        }

        public Task<Decision> AcknowledgeTimedOut(ChannelQueue queue, MessageDelivery delivery)
        {
            return Task.FromResult(Decision.JustAllow());
        }

        public Task MessageRemoved(ChannelQueue queue, QueueMessage message)
        {
            //This method is called when delivery cycle doesn't put message back into queue.
            //That method call does not mean, ack is received and it's successful.
            //So, if you don't want to remove it before ack is received, dont delete message at here.
            //delete it in AcknowledgeReceived method.

            return Task.FromResult(Decision.JustAllow());
        }

        public Task<Decision> ExceptionThrown(ChannelQueue queue, QueueMessage message, Exception exception)
        {
            Serilog.Log.Error("MailQueueHandler: " + exception);
            return Task.FromResult(Decision.JustAllow());
        }

        public Task<bool> SaveMessage(ChannelQueue queue, QueueMessage message)
        {
            return _database.Insert(message.Message);
        }
    }

Thank you,

I have managed to get the messages stored, however can't load them on restart.

In the options file, have set the status to pull, however there is no messages in the queue.

How to configure the message server to wait till consumer pull a message off the stack?

Trying to achieve the following,

  1. messages are posted via http get, realtime platform (10-50 messages per second)
  2. saving messages for processing later
  3. want to pull messages from queue and process them.

Think I need to split the producer / consumer and server into three modules.

using the Sample.MQ example as a starting point.

Think I need to split the producer / consumer and server into three modules.

Yes, your producer application just sends HTTP Requests for pushing messages into queues.
Consumer application uses TmqClient (you can see examle in Sample.Tmq project) and we recommend use it with connector class, it makes everything easier and managable.
Server application accepts both HTTP requests and TMQ connections. I'm gonna focus on server application in answers of your questions below.

1. messages are posted via http get, realtime platform (10-50 messages per second)

If you already handle HTTP requests with ASP.NET MVC or another HTTP server, you can skip to read answer of Q1.
In Sample.MQ project, we used TwinoServer object. It's for accepting and managing TCP connections. with UseMqServer method, it accepts connections for Messaging Queue server. But at same time, TwinoServer can handle both HTTP and MQ. So you can look to Sample.Mvc example too. TwinoServer's UseMvc method supports HTTP method too and it's usage very similar to ASP.NET MVC. You can handle HTTP request with controllers. In same project you will already have MqServer object. So you can push messages into queues programmatically in controller action methods. Here is an example;

        TwinoServer twinoServer = new TwinoServer(serverOptions);
        MqServer server = builder.CreateServer();

        TwinoMvc mvc = new TwinoMvc();
        mvc.Init(m =>
        {
            //you can use ioc etc.
            //m.Services.AddTransient
        });
        
        //maybe you need CORS
        CorsMiddleware cors = new CorsMiddleware();
        cors.AllowAll();

        mvc.Use(app =>
        {
            app.UseMiddleware(cors);
        });
        
        twinoServer.UseMvc(mvc, HttpOptions.CreateDefault());
        twinoServer.UseMqServer(server);
        twinoServer.Start();

2. saving messages for processing later

If you only want to load them from any source, you can use QueueFiller object to fill messages into your queue. QueueFiller fills all messages you into queue. You can create a queue filler;

  QueueFiller filler = new QueueFiller(queueObject);

filler has 4 methods, FillMessages, FillJson, FillData, FillString. Use which one you need.

If you have problems to find queueObject, use MqServer object's find channel methods to find the channel, then use find queue object in channel.

3. want to pull messages from queue and process them.

If you want to use with pull algorithm, your queue status should be Pull. In Pull status, producers push messages into queue and they are not sent to consumers immediately, they store in queue. when a consumer sends pull requests, it pulls messages from queue.
Here a quick example code.

        PullContainer container = await client.Queues.Pull(new PullRequest
                                                           {
                                                               Channel = "channel-name",
                                                               QueueId = 123, //queue id value (content type)
                                                               Count = 1,     //how many messages will be pulled
                                                           });

        foreach (TmqMessage message in container.ReceivedMessages)
        {
            //todo: process them
        }

Here are more useful properties of PullRequest object to customize the request. And PullContainer object has some more properties about the process.

You can use second parameter for Pull method if you want to process each message asap they received instead of processing them after all received. It's recommended if your messages is large and receiving them via network takes longer.

Hi,

I have been trying the new (Latest) version, with status set to pull

Server Code below

TwinoMQ mq = TwinoMqBuilder.Create()
.AddPersistentQueues(q => q.KeepLastBackup())
.UsePersistentDeliveryHandler(DeleteWhen.AfterAcknowledgeReceived, ProducerAckDecision.AfterReceived)
.AddOptions(o => o.Status = QueueStatus.Pull)
.Build();

        TwinoServer server = new TwinoServer();
       
        server.UseTwinoMQ(mq);
        mq.LoadPersistentQueues();
        
        server.Start(26222);
        return server.BlockWhileRunningAsync();

message are stored in a TDB and are getting received by the consumer

TmqStickyConnector connector = new TmqStickyConnector(TimeSpan.FromSeconds(2));
connector.AddHost("tmq://localhost:26222");
connector.ContentSerializer = new NewtonsoftContentSerializer();
connector.Observer.RegisterConsumer();
connector.Connected += (c) => { _ = connector.GetClient().Queues.Subscribe("model-a", true); };
connector.Run();

however when starting the server again, all the messages are loaded into the queue.

could you please assist why messages that are consumed are still loading when the server starts up.

.UsePersistentDeliveryHandler(DeleteWhen.AfterAcknowledgeReceived, ProducerAckDecision.AfterReceived)

You are sending acknowledge message from consumer?
If you are using a consumer object implements IQueueConsumer interface, you need to add AutoAck attribute.

Your message delete decision seems "AfterAcknowledgeReceived". Do you send back ack message to server?

Hi again,

It seems you are working on sample projects.
There is no Acknowledge attribute on ModelA class in producer project.
I reproduced and got same problem. Then I added [Acknowledge(QueueAckDecision.JustRequest)] to ModelA class in producer project, the problem is fixed.

The problem is here, persistent delivery handler removes messages from disk after acknowledge is received. But queue is not tracking acknowledge messages because it's disabled by default. Your acknowledge message isn't being tracked and persistent delivery handler's delete message operation isn't triggered.

Yes, there is a usage bug here that causes a misunderstanding. Persistent delivery handler initialization should throw an exception if it's delete operation is triggered by acknowledge and queue's acknowledge option is disabled.

Now, you can fix your problem by adding the attribute and you can be sure, in minor next version (in a few days), an exception will be added to persistent delivery handler initialization to warn. You will not able to start the server with incorrect options.

Thanks for your information about the issue.