Stiffstream/so5extra

[idea] A mbox that delivers only the last instance of a message

Opened this issue · 1 comments

eao197 commented

Sometimes an agent handles a number of messages of the same type. For example, a message msg_incoming_data with a portion of fresh data to process. The problem arises when the agent handles messages slower than the messages are generated. Sometimes a very simple form of overload control is suitable for such a case: just discard all msg_incoming_data messages that are currently in flight, and process only the last one.

Such a case can be easily handled by a special mbox that actually delivers just the last instance of the sent messages. Something like:

class demo final : public so_5::agent_t
{
  struct msg_incoming_data final : public so_5::message_t {
    const std::string m_data;
    msg_incoming_data(std::string data) : m_data{std::move(data)} {}
  };

  const so_5::mbox_t m_dest;
public:
  demo(context_t ctx)
    : so_5::agent_t{std::move(ctx)}
    , m_dest{so_5::extra::mboxes::last_msg_only::make<msg_incoming_data>(so_direct_mbox())}
  {}
  ...
  void so_evt_start() override {
    so_5::send<msg_incoming_data>(m_dest, "One");
    so_5::send<msg_incoming_data>(m_dest, "Two");
    so_5::send<msg_incoming_data>(m_dest, "Three");
  }
};

In this case only msg_incoming_data with value "Three" has to be delivered, two previous messages will be discarded.

eao197 commented

This mbox can be implemented this way:

  • the mbox stores the last message sent inside the mbox (the message instance won't be pushed into the event queue);
  • a special message envelope will be pushed to the queue (but only there is no previous envelope pushed earlier and not processed yet);
  • when the envelope is extracted from the event queue and event handler is called, then the message instance stored inside the mbox is used;
  • when a new message is sent to the mbox while the previously sent envelope isn't processed yet, the new message instance replaces the old one, stored in the mbox.

NOTE. It's an open question how this scheme will work with MPMC mboxes when there are several subscribers and they work with different speed.