arcus-azure/arcus.messaging

Provide 'aggregation' extension points for MessagePump

Opened this issue · 3 comments

Is your feature request related to a problem? Please describe.

It is not extraordinary that a service which processes messages writes some (custom) metrics to a log-sink (for instance Application Insights). We can currently do that by calling the LogCustomMetric method on the ILogger.
However, due to the design of Arcus.Messaging, this means that we need to call LogCustomMetric for every message that has been processed separately. In fact, this means that the value of the metric will most probably always be 1. (For instance, we processed 1 message of type X with such and such dimensions).

However, logging metrics this way, is not recommended. See here

Describe the solution you'd like

I think it would be nice if the message-pump could offer some kind of extension point where we can hook in and use that extension point for doing additional tasks (like loggin metrics).

On every invocation, the message-pump receives an x number of messages from the underlying messaging system. Every message will be processed separately. It would be nice if some kind of method can be executed after all the messages of this iteration have been processed.
This method would need to pass in some information: for instance the amount of messages that were retrieved and processed (succesfully / not succesfully) during this iteration.

Next to that, it would be even nicer if we could pass in some custom 'state' in that method. State that is collected in a MessageHandler.
For example, suppose we have a messagehandler that processes certain types of messages. Inside that message-handler, I'd like to keep track of some custom things:

 public async Task ProcessMessageAsync(
            OrderMessage message,
            AzureServiceBusMessageContext messageContext,
            MessageCorrelationInfo correlationInfo,
            CancellationToken cancellationToken)
{
      // Do some processing
     var customState = messageContext.State as Dictionary<string,object>;

     if ( processingSucceeded )
     {
          var customMetricData = customState["SucceededCounter"] as MyCustomStateObject;
          customMetricData.Counter += 1;
          customMetricData.Dimensions = new { ["ProcessingResult"] = "Succeeded", ["MessageType"] = "MyCustomMessage" } ;
     }
     else
     {
          var customMetricData = customState["FailedCounter"] as MyCustomStateObject;
          customMetricData.Counter += 1;
          customMetricData.Dimensions = new { ["ProcessingResult"] = "Succeeded", ["MessageType"] = "MyCustomMessage" } ;
     }
 }

Then, in that 'aggregation extension' point or hook, we could do this:

public async Task  AfterMessagePumpIteration( object customState )
{
      var customMetricData = customState as  Dictionary<string,object>;

      foreach( var kvp in customMetricData )
      {
           _logger.LogCustomMetric("OrdersProcessed", kvp.Counter, kvp.Dimensions);
      }
}

Theoretically, the above code would make it possible to log aggregated custom metrics with custom dimensions.

Couldn't this be fixed already with singleton registration in the dependency container, where the same instance is injected in the message handler(s)?
The message pump works with a reactive receival pattern, so there is no batching; hence, we don't know when to start or stop counting for messages. Maybe with a singleton registration, you can have some state for a certain message type/filter, but that would be outside the scope of the messaging library, I believe. 🤔

I think it would be good if we could just discuss this in a separate 30 min meeting.

I think it would be good if we could just discuss this in a separate 30 min meeting.

Yes, we could do that. 👍