Subscribers get congested
jage0435 opened this issue · 8 comments
I have 200 more devices and my laravel service will recieve more than 15 messages per second. Recently I found that subscriptions accumulate messages, causing data delays to become more and more serious. My question is the data arrived event is synchronous?
(The average data processing time is about 1.2 seconds.)
If you are not implementing any async logic yourself, the whole client works synchronously - that's right. I therefore advise you to use an intermediary queue to schedule the processing, which can then be done by (multiple) other processes. In other words, your subscribe()
callbacks should push the received message (incl. topic and what other infos you need) to a queue, which is processed by a bunch of worker processes - to parallelize the work.
thanks for your answer, this repository is amazing! :)
This is what I want! but i found broker will keep old shared subscribe when i restart mqtt client, so my client message will less and less until i restart broker. When i restart my client, broker will receive willMessage, so i'm sure connection not kept. i'm also used MQTTX client desktop app to confirm if it was the broker's problem, but app doesn't happen what I mentioned.
this is my setting below:
$this->mqtt = new MqttClient(config('mqtt.server'), config('mqtt.port'));
$connectionSettings = (new ConnectionSettings())
->setUsername(config('mqtt.username'))
->setPassword(config('mqtt.password'))
->setConnectTimeout(5)
->setLastWillTopic("willTest")
->setLastWillMessage("willMsgTest")
->setUseTls(config('mqtt.tls'));
$this->mqtt->connect($connectionSettings);
What broker are you using?
@jage0435 Ok, it seems to be an issue with the way you connect to the broker. The MqttClient::connect()
method has a second parameter called $useCleanSession
, which is false
by default. If you are using a persistent session (= the opposite of a clean session), the broker will remember the client even if it disconnects. The broker will then keep messages for this client during the offline period and will resend this messages to the client when it connects to the broker again. There is a configurable limit for the queue size per client though, which means the broker will not keep infinite messages.
A persistent session is problematic if you combine it with a random client id (which is generated because you do not pass one to MqttClient::__construct()
), because every time you restart a worker (and call MqttClient::__construct()
), a new client id will be generated. So the broker sees every client of you as new client in the shared subscription group.
The solution should be rather simple therefore:
Pass true
as second parameter: MqttClient::connect($connectionSettings, useCleanSession: true)
Method reference:
client/src/Contracts/MqttClient.php
Lines 23 to 37 in 0321e8b
I really appreciate your prompt reply, all my problems have been solved. Thank you!