php-mqtt/client

Run forever

christianostrem opened this issue · 7 comments

Hi,

This is not an issue, but questions about whats possible with this libary.

This is what i want to do.

I have multiple connections and multiple topics. Over 1000 on each connection.
I want to subscribe on everyone and on change i want to push that to the database -> doctrine -> symfony in my case.
I want the connection to run forever, but i might need to add more topics when i get new ones.

Is that possible with this libary.

I see that you have a loop with a while. So i guess i can run that forever?

$mqtt->loop(true);

--

$mqtt->subscribe('php-mqtt/client/test', function ($topic, $message) {
    echo sprintf("Received message on topic [%s]: %s\n", $topic, $message);
}, 0);

If i run this multiple times will it subscribe to all topics?

Do you know how i can get it running forever without having an open terminal?

Would you suggest an other solution? I want to use php and symfony if possible.

You can totally do that with this library, yes. The only part of your question which raises a few questions is the multiple connections part. Does that mean you require one PHP process to subscribe to multiple different MQTT brokers or would it be acceptable to spawn multiple processes, each subscribing to a different broker? Both variants work, but one process per broker is significantly easier to do.

The next question before I recommend a solution is: how do you get new topics you need to subscribe to? Is a user entering some topic in a web form and you save that entry in the database?


Assuming that you build a Symfony console command with arguments for the broker, potentially the credentials and things like the topic, you can easily start multiple processes with different configuration which perform the same task. Starting and potentially restarting these processes can be done with something like supervisord. A very basic configuration for pretty much any command could look like this:

[program:mqtt-recorder]
process_name=%(program_name)s
command=php /app/bin/console mqtt-recorder --broker=example.com --topic=foo/bar/#
autostart=true
autorestart=true
user=someuser
redirect_stderr=true
stdout_logfile=/app/logs/mqtt-recorder.log
stopwaitsecs=30

If your database contains the topics which need to be subscribed to per connection, you can easily run a loop hook to fetch your topics and update the subscriptions as needed (although you should not do that only like every 60 seconds or so, not with every loop iteration - it would slow down the whole client). An example of a periodic loop handler can be found in the php-mqtt/client-examples repository.

When you have a list of topics (basically strings) you want to subscribe to, potentially loaded from the database, subscribing is straight forward:

$topics = ['foo/bar/#', 'some/more/+'];

$client = new MqttClient($host, $port, $clientId);
$client->connect();

foreach ($topics as $topic) {
    $client->subscribe($topic, function ($topic, $message) {
        // ... code to store the message in the database ...
    });
}

$client->loop(true);

The $client->loop() method will run infinitely and polls the connected socket for new data, which is then delivered to the subscribed callbacks (the part with the code to store the message in the database comment). The loop also sends acknowledgements and retransmissions when using QoS 1 or 2.

If you use a loop event handler as described above, you can of course subscribe to new topics within said handler (or unsubscribe from topics, if needed).

I just read about supervisord and thats seems easy enough.
When i talk about multiple connections iam using the same broker but there might be different user that is used for different clients. At this moment i am just using one user.

I can create a command in symfony that starts and stops supervisord and ad arguments to the symfony command that telles the prossess what connection it should use.

When it comes to topic they are auto discovered when my iot device is online and created i a database. This dosent happen to often. And a 5 minute delay is not a problem. Ill look at the logic here. i will need to now if the topic is subscribed or not. Add a true/false flag.

But the whole thing seems possible. The last question: Will it be stable :)

Thank you for your answers !

As long as the hosts/credentials used to connect to the brokers are not dynamic in a way which requires you to spawn new connections based on database entries or so, this should be fairly simple. But even if that was the case, you could still leverage Symfonys process library to supervise these processes.

Regarding knowging whether a topic is already subscribed or not: storing this bit of information in the database is tricky, because what happens when your MQTT client goes away for some reason? How do you reset it? You would at least need a second field to store some kind of TTL, like a date until the information is valid. And this date would need to be updated regularly before coming invalid, by the active subscribing client of course. But at this point you could also use Redis as cache, which brings significantly better performance for such a task.

The only thing I still want to give you on your journey is that PHP is single-threaded. Depending on the amount of subscribed topics and messages received, it might be a good idea to split the subscriptions and processing between multiple processes. And that is also the most complex part about all this: horizontal scaling. Because in case of MQTT, especially when using wildcard topic subscriptions, it is a really tough problem to solve.

If you have any more questions, feel free to ask.

When i comes to scaling i might add more serveres overtime. depending on how it preforms, but at this time is more like hundreds iot dives and not millions. If it comes to millions ill have the budgets to scale :)

Regarding knowging whether a topic is already subscribed or not: I can add a last updated to the prossess. And if the service "supervisord" connection dies the time will tell me. When i restart the service i can reset the subscribe "topic" to false. I theory. I will think about it :)

Anyways. Thanks again! Its bedtime in my region :)

As your questions seem to be answered, I'm closing this for now. Feel free to open it again if you have further questions.

I just wanted to tell you that i got i all working :) I am using supervisord to run the process and i have created a channel called reset. If i send true it will run $client->interrupt(); and $client->disconnect. supervisord will then run the process all over again and resubscribe all topics and add new ones.

I think your library is one of the best out there at this moment. Keep up the good work and support :)

That's nice to hear!

To subscribe to new topics, there are quite a few ways available. If resetting is not an issue for you (and it is ok to miss a few published messages, at least you need to expect that), your solution is fine. You could also use a cronjob to restart the supervisord process once an hour for example, if that suits your case more.

If you publish to the reset channel from your frontend (where new topics are entered), you could also just publish the new topic and subscribe within the subscription callback of the reset channel though. I'm talking about something like this:

$client->subscribe('new-topic', function (string $topic, string $message) use ($client) {
    // $message contains the name of the new topic
    $client->subscribe($message, fn ($t, $m) => $this->handleMessage($t, $m));
});

(The example assumes that handleMessage($topic, $message) is a generic callback for all topics.)


I think your library is one of the best out there at this moment.

That was the main goal, to be honest. I once was looking for a library and was quite surprised that nothing with some bearable quality existed. Single-file libraries with PHP 5 style are not quite what I'm looking for. So I decided to write my own, even though I ended up not needing MQTT (in PHP) anymore. I just finished it as an exercise and also a bit for fun. 😄