php-mqtt/client

Subscribers get congested

jage0435 opened this issue · 8 comments

I have 200 more devices and my laravel service will recieve more than 15 messages per second. Recently I found that subscriptions accumulate messages, causing data delays to become more and more serious. My question is the data arrived event is synchronous?
(The average data processing time is about 1.2 seconds.)

If you are not implementing any async logic yourself, the whole client works synchronously - that's right. I therefore advise you to use an intermediary queue to schedule the processing, which can then be done by (multiple) other processes. In other words, your subscribe() callbacks should push the received message (incl. topic and what other infos you need) to a queue, which is processed by a bunch of worker processes - to parallelize the work.

thanks for your answer, this repository is amazing! :)

@jage0435 Actually, there is another option: shared subscriptions. Maybe, when your broker supports it, you can run multiple workers with an MQTT subscription in parallel. See #75 for more details. This would distribute incoming messages on multiple processes.

This is what I want! but i found broker will keep old shared subscribe when i restart mqtt client, so my client message will less and less until i restart broker. When i restart my client, broker will receive willMessage, so i'm sure connection not kept. i'm also used MQTTX client desktop app to confirm if it was the broker's problem, but app doesn't happen what I mentioned.

this is my setting below:

$this->mqtt = new MqttClient(config('mqtt.server'), config('mqtt.port'));
$connectionSettings = (new ConnectionSettings())
      ->setUsername(config('mqtt.username'))
      ->setPassword(config('mqtt.password'))
      ->setConnectTimeout(5)
      ->setLastWillTopic("willTest")
      ->setLastWillMessage("willMsgTest")
      ->setUseTls(config('mqtt.tls'));
$this->mqtt->connect($connectionSettings);

What broker are you using?

@Namoshek mosquitto mqtt broker - version :2.0.11

@jage0435 Ok, it seems to be an issue with the way you connect to the broker. The MqttClient::connect() method has a second parameter called $useCleanSession, which is false by default. If you are using a persistent session (= the opposite of a clean session), the broker will remember the client even if it disconnects. The broker will then keep messages for this client during the offline period and will resend this messages to the client when it connects to the broker again. There is a configurable limit for the queue size per client though, which means the broker will not keep infinite messages.

A persistent session is problematic if you combine it with a random client id (which is generated because you do not pass one to MqttClient::__construct()), because every time you restart a worker (and call MqttClient::__construct()), a new client id will be generated. So the broker sees every client of you as new client in the shared subscription group.

The solution should be rather simple therefore:
Pass true as second parameter: MqttClient::connect($connectionSettings, useCleanSession: true)

Method reference:

/**
* Connect to the MQTT broker using the given settings.
* If no custom settings are passed, the client will use the default settings.
* See {@see ConnectionSettings} for more details about the defaults.
*
* @param ConnectionSettings|null $settings
* @param bool $useCleanSession
* @return void
* @throws ConfigurationInvalidException
* @throws ConnectingToBrokerFailedException
*/
public function connect(
ConnectionSettings $settings = null,
bool $useCleanSession = false
): void;

I really appreciate your prompt reply, all my problems have been solved. Thank you!