php-mqtt/client

MQTT subscribe/publish using PHP only doesn't return Message Broker feedback

kavitama opened this issue · 3 comments

We are playing around with IOT devices using the MQTT protocol and having some trouble getting some things working.

The main assumption is that all the communication is made between the webapp and the message broker as per the publish/subscribe model that does not allow a direct communication with the IOT device.

We are playing with a smart plug that has 5 poutlets and what we want to achieve is:

  • know the status of the five plugs (on or off)
  • being able to toggle the status of each of them

All this has been done using a PHP library named "php-mqtt/client" which mainly simplifies the interaction with the message broker and uses an embedded client to talk with the MQTT message broker.

What was achieved:

  • being able to change status

What was NOT achieved:

  • getting the feedback from the message broker
  • subscribe to the message broker to see the plug's status change

I am pasting below the code that has been used to publish a message to the specified plug

In every example that I saw, the idea is to first subscribe to a topic that will show me the result of any publish sent to the device and then send the publish message.

In my mind this should be an async process but it is not clear to me how this could be done within the same code (without involving other languages or different elements)

If I test manually using a mosquitto_pub/sub command I can see the direct feedback from the message broker but when I run the "$client->publish()" I dont get any "feedback" that I can evaluate to know the command was received correctly.

The corresponding mosquitto command are:
mosquitto_pub -h $HOST -p $PORT --cafile $CERTIFICATE -u $USERNAME -P $PASSWORD -t cmnd/$USERNAME/$COMPONENT -m on and
mosquitto_pub -h $HOST -p $PORT --cafile $CERTIFICATE -u $USERNAME -P $PASSWORD -t cmnd/$USERNAME/$COMPONENT -m off

   public function actionRunFromLibrary($id, $aid, $cid=null)
    {
        // Defaults
        $HOST = 'xxxxxxx.outagesio.com';
        $PORT = 'xxxxx';
        $USERNAME = null;
        $PASSWORD = null;
        $COMPONENT = null;
        $runTopic = null;
        $runMessage = null;
        // Find device
        $device = OneIotd::model()->findByPk($id);
        // If any
        if ($device) {
            $USERNAME = $device->username;
            $PASSWORD = $device->password;
            // If there is a component
            if ($cid) {
                // Find component
                $component = OneIotdComponent::model()->findByPk($cid);
                // If any
                if ($component) {
                    // Set component
                    $COMPONENT = $component->iotdModelComponent->name;
                }
            }
            // Map topic and actions
            switch ($aid) {
                // Switch On
                case 1:
                    $runTopic = "cmnd/$USERNAME/$COMPONENT";
                    $runMessage = 'on';
                    break;
                // Switch Off
                case 2:
                    $runTopic = "cmnd/$USERNAME/$COMPONENT";
                    $runMessage = 'off';
                    break;
                // Get Status 0
                case 3:
                    # code...
                    $runTopic = "cmnd/$USERNAME/Status";
                    $runMessage = '0';
                    break;
                // Get Status 7
                case 4:
                    # code...
                    $runTopic = "cmnd/$USERNAME/Status";
                    $runMessage = '7';
                    break;
            }
            /******************************************************************/
            // Create an instance of a PSR-3 compliant logger. For this example, we will also use the logger to log exceptions.
            $logger = new LogLevel(LogLevel::INFO); 
            // Start try/catch 
            try {
                // Init MQTT client
                $client = new MqttClient(
                    $HOST,
                    $PORT,
                    'web-app',
                    MqttClient::MQTT_3_1,
                    null,
                    null
                );
                // Connection Settings
                $connectionSettings = (new ConnectionSettings)
                    // The username used for authentication when connecting to the broker.
                    ->setUsername($USERNAME)
                    // The password used for authentication when connecting to the broker.
                    ->setPassword($PASSWORD)
                    // The connect timeout defines the maximum amount of seconds the client will try to establish
                    // a socket connection with the broker. The value cannot be less than 1 second.
                    ->setConnectTimeout(60)
                    // The socket timeout is the maximum amount of idle time in seconds for the socket connection.
                    // If no data is read or sent for the given amount of seconds, the socket will be closed.
                    // The value cannot be less than 1 second.
                    ->setSocketTimeout(5)
                    // The resend timeout is the number of seconds the client will wait before sending a duplicate
                    // of pending messages without acknowledgement. The value cannot be less than 1 second.
                    ->setResendTimeout(10)
                    // This flag determines whether the client will try to reconnect automatically
                    // if it notices a disconnect while sending data.
                    // The setting cannot be used together with the clean session flag.
                    ->setReconnectAutomatically(false)
                    // Defines the maximum number of reconnect attempts until the client gives up.
                    // This setting is only relevant if setReconnectAutomatically() is set to true.
                    ->setMaxReconnectAttempts(3)
                    // The keep alive interval is the number of seconds the client will wait without sending a message
                    // until it sends a keep alive signal (ping) to the broker. The value cannot be less than 1 second
                    // and may not be higher than 65535 seconds. A reasonable value is 10 seconds (the default).
                    ->setKeepAliveInterval(10)
                    // If the broker should publish a last will message in the name of the client when the client
                    // disconnects abruptly, this setting defines the topic on which the message will be published.
                    // A last will message will only be published if both this setting as well as the last will
                    // message are configured.
                    ->setLastWillTopic(null)
                    // If the broker should publish a last will message in the name of the client when the client
                    // disconnects abruptly, this setting defines the message which will be published.
                    // A last will message will only be published if both this setting as well as the last will
                    // topic are configured.
                    ->setLastWillMessage(null)
                    // The quality of service level the last will message of the client will be published with,
                    // if it gets triggered.
                    ->setLastWillQualityOfService(0)
                    // This flag determines if the last will message of the client will be retained, if it gets
                    // triggered. Using this setting can be handy to signal that a client is offline by publishing
                    // a retained offline state in the last will and an online state as first message on connect.
                    ->setRetainLastWill(false)
                    // This flag determines if TLS should be used for the connection. The port which is used to
                    // connect to the broker must support TLS connections.
                    ->setUseTls(true)
                    // This flag determines if the peer certificate is verified, if TLS is used.
                    ->setTlsVerifyPeer(false)
                    // This flag determines if the peer name is verified, if TLS is used.
                    ->setTlsVerifyPeerName(false)
                    // This flag determines if self signed certificates of the peer should be accepted.
                    // Setting this to TRUE implies a security risk and should be avoided for production
                    // scenarios and public services.
                    ->setTlsSelfSignedAllowed(false)
                    // The path to a Certificate Authority certificate which is used to verify the peer
                    // certificate, if TLS is used.
                    ->setTlsCertificateAuthorityFile(null)
                    // The path to a directory containing Certificate Authority certificates which are
                    // used to verify the peer certificate, if TLS is used.
                    ->setTlsCertificateAuthorityPath(null)
                    // The path to a client certificate file used for authentication, if TLS is used.
                    // The client certificate must be PEM encoded. It may optionally contain the
                    // certificate chain of issuers.
                    ->setTlsClientCertificateFile(null)
                    // The path to a client certificate key file used for authentication, if TLS is used.
                    // This option requires ConnectionSettings::setTlsClientCertificateFile() to be used as well.
                    ->setTlsClientCertificateKeyFile(null)
                    // The passphrase used to decrypt the private key of the client certificate,
                    // which in return is used for authentication, if TLS is used.
                    // This option requires ConnectionSettings::setTlsClientCertificateFile() and
                    // ConnectionSettings::setTlsClientCertificateKeyFile() to be used as well.
                    ->setTlsClientCertificateKeyPassphrase(null);
                // Connect to the broker using connection settings and a clean session
                $client->connect($connectionSettings, true);
                // Subscribe to the topic 'stat/{topic}/result' using QoS 2.
                $client->subscribe(
                    "stat/$USERNAME/result",
                    function (
                        string $topic, 
                        string $message, 
                        bool $retained
                    ) use ($logger, $client) {
                        $logger(
                            'We received a {typeOfMessage} on topic [{topic}]: {message}', 
                            [
                                'topic' => $topic,
                                'message' => $message,
                                'typeOfMessage' => $retained ? 'retained message' : 'message',
                            ]
                        );
                        // After receiving the first message on the subscribed topic, we want the client to stop listening for messages.
                        $client->interrupt();
                    }, 
                    MqttClient::QOS_EXACTLY_ONCE
                );
                // Here we register a timeout using a loop event handler. The event handler is passed the elapsed time
                // the loop runs already (in seconds).
                // We do this because in case the receiver of our request is offline, we would be stuck in a loop forever.
                $client->registerLoopEventHandler(
                    function (
                        MqttClient $client, 
                        float $elapsedTime
                    ) {
                        // After 10 seconds, we quit the loop.
                        if ($elapsedTime > 2) {
                            $client->interrupt();
                        }
                    }
                );
                // Publish a fixed message
                $client->publish(
                    $runTopic,
                    $runMessage,
                    MqttClient::QOS_EXACTLY_ONCE
                );
                // Since QoS 2 requires the publisher to await confirmation and resend the message if no confirmation is received,
                // we need to start the client loop which takes care of that. By passing `true` as second parameter,
                // we allow the loop to exit as soon as all confirmations have been received.
                $client->loop(true, true);
                // Gracefully terminate the connection to the broker.
                $client->disconnect();
            } catch (MqttClientException $e) {
                // MqttClientException is the base exception of all exceptions in the library. Catching it will catch all MQTT related exceptions.
                Yii::log(
                    'Publishing a message using QoS 2 failed. An exception occurred.'.print_r($e, true),
                    'error',
                    'controller.iotd',
                );
            }
            /******************************************************************/
        }
        // Redirecting
        $this->redirect(
            [
                'iotd/dashboard',
                'id' => $id,
            ]
        );
    }

MQTT is asynchronous by design, although it requires the consumer to have subscribed before the produced publishes its message(s). Compare it to television where the consumer selects the channel before they can see what the producer is broadcasting on the channel. You can't watch back already broadcasted video in television (at least not with tradidional analog television without internet).

It is possible though to use the $retain flag to have the broker store published messages for a topic (until replaced by a new message on the same topic), which allows late subscribers to receive already published messages. Whether using message retention makes sense, depends on the use case.

In your case, I guess it has something to do with the QoS 2 and the relatively low timeout of 2 seconds being used in the $client->registerLoopEventHandler() callback. QoS 2 messages are "only really sent" when using $client->loop() because the MQTT client needs to acknowledge the reception of the message or it will attempt to send it again after a given timeout. Same goes for the subscriber.

By the way, in your mosquitto_pub/mosquitto_sub example, you do not use an explicit QoS which defaults to QoS 0. Maybe try using MqttClient::QOS_AT_MOST_ONCE in the PHP script for direct comparison.

Thank you for the answer but I think my problem is that I dont know how to use the PSR logging within the YII app I have.
First I was doing a mistake in the creation of the logger but now I have no idea where the PSR logger is writing
Any hint of where to understand how PSR logging works and can be configured?

I'm not familiar with the Yii framework and how its logger works. For some basic testing, you can use the SimpleLogger as it is used in the examples. It uses echo to print log messages to the console.