About the performance of the RetainedMessages
zhaowgit opened this issue · 0 comments
Describe the feature request
I am currently using MqttServer.InjectApplicationMessage to push RetainedMessages, but when the number of Topics reaches tens of thousands, the performance of Retained will drop significantly, with tens of thousands push taking several minutes. The more data there is, the lower the performance.
Which project is your feature request related to?
- Server
Describe the solution you'd like
I found the reason for the performance degradation in the source code of MqttRetainedMessagesManager. For each push of RetainedMessages, all RetainedMessages are converted from Dictionary<string, MqttApplicationMessage> to List, which is used as a parameter for RetainedMessageChangedEvent. This consumes a lot of performance. Even if I don't subscribe to RetainedMessageChangedEvent, the conversion process will still run. I tried modifying the code in this section and disabled the conversion. After doing so, the time it took to push tens of thousands of messages reduced from several minutes to about 100ms. I suggest disabling this conversion when not subscribing to RetainedMessageChangedEvent to improve the performance of RetainedMessages.
Describe alternatives you've considered
Additional context
Example
public async Task UpdateMessage(string clientId, MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null)
{
throw new ArgumentNullException(nameof(applicationMessage));
}
try
{
List<MqttApplicationMessage> messagesForSave = null;
var saveIsRequired = false;
//add hasHandlers;
var hasHandlers = eventContainer.RetainedMessageChangedEvent.HasHandlers;
lock (_messages)
{
var payload = applicationMessage.Payload;
var hasPayload = payload.Length > 0;
if (!hasPayload)
{
saveIsRequired = _messages.Remove(applicationMessage.Topic);
_logger.Verbose("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic);
}
else
{
if (!_messages.TryGetValue(applicationMessage.Topic, out var existingMessage))
{
_messages[applicationMessage.Topic] = applicationMessage;
saveIsRequired = true;
}
else
{
if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel ||
!MqttMemoryHelper.SequenceEqual(existingMessage.Payload, payload))
{
_messages[applicationMessage.Topic] = applicationMessage;
saveIsRequired = true;
}
}
_logger.Verbose("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic);
}
//add " && hasHandlers"
if (saveIsRequired && hasHandlers)
{
messagesForSave = new List<MqttApplicationMessage>(_messages.Values);
}
}
//add " && hasHandlers"
if (saveIsRequired && hasHandlers)
{
using (await _storageAccessLock.EnterAsync().ConfigureAwait(false))
{
var eventArgs = new RetainedMessageChangedEventArgs(clientId, applicationMessage, messagesForSave);
await _eventContainer.RetainedMessageChangedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
}
catch (Exception exception)
{
_logger.Error(exception, "Unhandled exception while handling retained messages.");
}
}