php-mqtt/client

Symfony messenger Transport

kl3sk opened this issue · 15 comments

kl3sk commented

Hello,

I try to create a transport for Symfony messenger and RabbitMQ (with mqtt plugin enbaled) with no success following the doc.

It work with a simple controller following your example, but I need it to be asynchronous and/or be handled by the worker.

I found a package but it doesn't use php-mqtt and seem to not work.

Any help ?

Thanks.

When you use RabbitMQ explicitly, why don't you use https://github.com/php-amqplib/RabbitMqBundle?

My knowledge about Symfony is quite limited, so I'm not sure how to help.

kl3sk commented

Thank you for you answer.

RabbitMQ is (it think) not the problem. It works as expected when I play with AMQP protocol. But I need to use MQTT with symfony.

I'm not sure that this bundle help more.

Well, you most likely don't need a bundle at all. You can also simply use the client as-is to achieve whatever you want, although it might not "feel" very Symfony'isch without it being named an actual bundle.

However, if you can give a bit more info about what you are trying to do with the MQTT client, I might be able to help you out anyway.

kl3sk commented

Obviously this is not a symfony-bundle (which is not very complicated to do).

To be clearer.

I have some device who talk with my broker through MQTT, this data have to be handled with an application written with Symfony.

Because symfony-messenger can handle asynchronous message (at least AMQP), I think I was able to manage it with MQTT.
I read the repo you gave me (eg: RabbitMqBundle). I think I understand that MQTT message can be "transferred" to AMQP.

In fact I'm new to theses technologies, less than yesterday ^^.

As I said in the first post, Symfony allow to create a custom transport (maybe this is not the good way) and allow new protocols.

As I understand the message should be passed in the get method of the Transport through an Envelope which is a simple wrapper. But php-mqtt handle response inside a callback and get method wait for an iterable.

I'll be very happy if you can help me.

If we found a solution, I'll will be very happy to make a bundle for it.

Thanks for you work and time.

Because symfony-messenger can handle asynchronous message (at least AMQP), I think I was able to manage it with MQTT.
I read the repo you gave me (eg: RabbitMqBundle). I think I understand that MQTT message can be "transferred" to AMQP.

Yes and no. RabbitMQ is a message broker with AMQP being the primary protocol. There is an MQTT plugin for RabbitMQ, which allows some interoperability between AMQP and MQTT, although not all exchange types are supported as far as I know. Also be aware that AMQP is the more powerful protocol.

As I understand the message should be passed in the get method of the Transport through an Envelope which is a simple wrapper. But php-mqtt handle response inside a callback and get method wait for an iterable.

I think that I've understood the assignment now. 😅

Maybe something like this can work:

<?php

use PhpMqtt\Client\Contracts\MqttClient as MqttClientContract;
use PhpMqtt\Client\MqttClient;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class MqttTransport implements TransportInterface
{
    private readonly MqttClientContract $client;
    private readonly SerializerInterface $serializer;

    private static string $topicPatternToSubscribe = 'some/request/topic';
    private static string $topicToPublishTo = 'some/response/topic';

    public function __construct(SerializerInterface $serializer = null)
    {
        $this->serializer = $serializer ?? new PhpSerializer();

        // TODO: reference config and use connection settings if needed
        $this->client = new MqttClient($host, $port, $clientId);
        $this->client->connect(null, true);
    }

    public function get(): iterable
    {
        // Store the received messages in some kind of queue. An array should be sufficient for now.
        $queue = [];

        // Subscribe to a topic pattern.
        $client->subscribe(self::$topicPatternToSubscribe, function (string $topic, string $message, bool $retained) use (&$queue) {
            $queue[] = [
                'topic' => $topic,
                'message' => $message,
                'retained' => $retained,
            ];
        }, MqttClient::QOS_AT_MOST_ONCE);

        // Manually loop once at a time and then yield all the queued messages.
        $loopStartedAt = microtime(true);
        while (true) {
            $client->loopOnce($loopStartedAt, true);

            while (! empty($queue)) {
                yield array_shift($queue);
            }
        }
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);
        
        // Only QoS 0 can be used because for other kinds of publishing, looping is required,
        // which blocks the process (temporarily).
        $this->client->publish(self::$topicToPublishTo, $encodedMessage);

        return $envelope;
    }

    public function ack(Envelope $envelope): void
    {
        // Cannot be implemented in my opinion.
    }

    public function reject(Envelope $envelope): void
    {
        // Cannot be implemented in my opinion.
    }
}

Maybe some little explanation about the internals of the MqttClient:
$this->client->loopOnce() will let the client process the inbound buffer which may contain new messages and acknowledgements for published messages. Whenever a new message is found, then it is passed to subscribing callbacks. For QoS > 0, the client first manages the whole acknowledgement stuff before passing the message to the callbacks.

kl3sk commented

Ho boy,

Man I just have to say something !

You are my saviour.

Juste have to adapt some stuff

BUT IT WORKS !!! (as I expect, hope the rest follows)

I have to dig more but the idea is here.

I will work on my project and try to create a bundle as promised !

Again thanks a lot !

That's nice to hear! Looking forward to your bundle. 👍

One small note about my snippet above: I've corrected a minor issue regarding the $loopStartedAt variable, which should be determined only once outside the while loop:

$loopStartedAt = microtime(true);
while (true) {
    $client->loopOnce($loopStartedAt, true);

    while (! empty($queue)) {
        yield array_shift($queue);
    }
}
kl3sk commented

Got it. Thanks.

It's late here, I just did a quick test.
Don't be in a hurry for the bundle, but I'll try my best to create a first POC soon.

At least, to thank you again.

Last question (maybe 😉) :
I see a MemoryRepository that have a newMessageId method. What does it meant for ? May I access to it from the get method ?

Just to explain a little further:
-> AS you can see here, an ID can be applied. Maybe not relevant in our case.

Note: I'll know that AMQP is powerful, but MQTT is required in this case.

Accessing the message id is not easy and only possible when overriding certain parts of the client, I would say. However, the message id is only relevant for QoS > 0 in MQTT and it is not unique over time (it loops between 1 and 65535), so I'm not sure it is even applicable here...

kl3sk commented

Hello @Namoshek

This is my first attempt, this is far from perfect. Let's enhance it.

Some tests need to be done first.

Well, it seems to do what it is meant to do. Though I'm not sure MQTT is the right technology for a Symfony transport. I've not gone very deep into the documentation, but it sounds like transports are intended for queue systems and asynchronous processing, which MQTT is not. But if it works for you, I'm glad the library is helpful!

kl3sk commented

Hello,

That one of the questioning I had. MQTT is not a queue system.

Why or couldn’t be ? ‘Messenger’ handles synchronous task of needed.

My purpose is to connect an MQTT message to a symphony application.

I ‘am open to any advices 😁

MQTT is a publish-subscribe protocol based on topics. Most MQTT brokers support queuing and storing a limited amount of messages for (persistent) clients, although that may require additional configuration of the broker. The storage capacity is mostly very limited though (like a 1000 messages for a client). That means the producer and consumer should both be online at the same time and the consumer should be able to handle the amount of messages produced, otherwise the delivery queue of the client fills up and overflows, which leads to messages being dropped.

A real queue system on the other hand would be able to store millions of messages in a queue, because one of the key aspects of a queue system is to separate the producer from the consumer - also regarding their availability and online state.

There are other aspects to this as well:

  • MQTT only has the concept of message delivery acknowledgement, but not message processing acknowledgement. AMQP for example allows you to retrieve a message from a queue, process it and then acknowledge it which leads to its removal from the queue.
  • MQTT only started to support shared subscriptions for highly-available consumers in version 5. Because it's a broker-only feature, some MQTT brokers support it also for MQTT 3.1(.1), but it is actually an MQTT 5 feature.
  • MQTT only started to support meta data (like headers or key-value pairs) with version 5. This library does not support MQTT 5, so it does also not support those.
  • ... and many more

That doesn't mean you can't make it work for you, though. It really depends heavily on your use case. If all you need is processing messages from IoT devices (because that's the most common use case), you'd probably be better off writing a simple console command which runs an MQTT consumer. Depending on the actions the consumer performs (e.g. inserting data in a database, forwarding it via HTTP or whatever), you'd want to further improve the consumer by pushing the received MQTT messages into a local queue and process them in one or more other processes (like queue workers).

kl3sk commented

Hum, very interesting answers thank you. I learn lot of things.

I recently read what you said about V5.

Actually if I understand your message. I maybe "resend" the MQTT message to a queuing system (eg: AMQP) to process it later ?

In an other hand I found this.

Again thank you for your time.

Yes, you should be able to map MQTT to AMQP. If it's not possible within RabbitMQ in the way you need it (although I think it is), you can still write a very, very simple app that pushes messages received via MQTT to RabbitMQ via AMQP. With AMQP, you can then reliably process the received messages - although that's a bit off-topic here. 👍