php-mqtt/client

DataTransferException: Transferring data over socket failed

braberli opened this issue · 5 comments

Hi,

I did my first steps with php-mqtt two weeks ago and it works perfectly for my demo use case. I now tried to scale up and use the same way for publishing data for not only one device but several, using the same client and end up with an exception.

For the first demo (one device), there was a published message every second. Message size was about 40,000 Bytes. In the loop for creating messages is "sleep(1)".

After adding more devices to the loop, creating messages with different sizes (20kB to 160kB), I get a PHP Notice "DataTransferException: [65] Transferring data over socket failed: Sending data over the socket failed. Has it been closed?" in MqttClient.php:1158. The StackTrace shows

#0 MqttClient->writeToSocket()
#1 MqttClient->writeToSockerWithAutoReconnect()
#2 MqttClient->publishMessage()
#3 MqttClient->publish()

I can then connect again and publish more messages. It is not reproducable, how many messages are sent out before the exception occurs. Sometimes, it runs for several minutes and then breaks.

I tried to reduce the amount of data per message by compressing the json string before publishing without any visible change.

Is there any known limitation for the number of published messages or the data volume per second/minute? As it runs without any problem for hours just sending messages for one topic, I don't think that there is a problem in the network connection between client and broker.

I use the last version from GitHub and PHP 7.4.33 (cli) on Debian Bullseye. The MQTT Broker is Mosquitto (running in Docker). Mosquitto does not show any useful information:

1705146917: Sending CONNACK to mytestclient (0, 0)
1705146922: Received PUBLISH from mytestclient (d0, q0, r0, m0, 'testtopic_2', ... (22329 bytes))
1705146922: Received PUBLISH from mytestclient (d0, q0, r0, m0, 'testtopic_3', ... (18416 bytes))
1705146922: Received PUBLISH from mytestclient (d0, q0, r0, m0, 'testtopic_7', ... (23791 bytes))
1705146922: Received PUBLISH from mytestclient (d0, q0, r0, m0, 'testtopic_34', ... (33232 bytes))
1705146922: Received PUBLISH from mytestclient (d0, q0, r0, m0, 'testtopic_39', ... (22883 bytes))
1705146923: Received PUBLISH from mytestclient (d0, q0, r0, m0, 'testtopic_50', ... (51408 bytes))
1705146923: Client mytestclient disconnected: No error information.

Best regards
Christian

In the loop for creating messages is "sleep(1)".

Is this still the case for your final script? I don't think it should cause issues, but it might trigger weird scenarios like timeouts.

Is there any known limitation for the number of published messages or the data volume per second/minute?

No, the client goes as fast as it can. The actual performance depends a lot on your CPU, memory and network throughput. Also the quality of service level used for publishing and subscribing matters, as QoS 0 (no confirmation, but also no reliabiblity) has far better throughput than QoS 1 and 2. In my benchmarks, I've easily reached 400 Mbit/s and more (broker on localhost).

Without more details regarding your code, I can only guess what's going on. It could be that you are publishing too fast with high QoS for the broker to keep up (or according to its configuration), so it disconnects your client. That should be visible in the logs though (at least with some log level). There has also been a similar report in the past regarding messages with sizes larger 200 KB (see #133).

Maybe try increasing the log level of Mosquitto and isolate the issue (high throughput vs. large payload).

The sleep(1) is still in the code. It reduces the number of events. I publish messages in a loop (do ... while) including a request to the database and want to limit the requests.
The connection timeout for MQTT should be higher than 1 second.

I will have a closer look to the broker (running in a docker swarm, but currently on a Raspberry Pi 1). I already had the log_all without getting any further details.

I do not set explicitely the QoS level. So, it should be QoS 0.

Best regards
Christian

The problem seemed to be on the broker side as it only was a Raspi 1. Running the broker on a bigger environment didn't raise up these exceptions. Anyway: I catch the exceptions and reconnect and it works.

The "big" amount of data results from 70 different topics that can be subcribed to, but I am only interested in publishing messages on a topic as long as a client subscribes to that topic.
I could extremely reduce the amount of data sent out if I find a solution to subscribe to a topic and recieve messages in the publication loop. But I didn't find out (yet) how to do that. The only way that I can imagine is to use an event handler (see example https://github.com/php-mqtt/client-examples/blob/master/04_hooks/03_use_a_message_received_event_handler_to_log_received_messages.php) and only publish a message after recieving a message. That would reduce the amount of data, but seems to me not to be the ideal way. Are there any other suggestions?

Best regards
Christian

You can call the MqttClient::loopOnce() method in your publication loop. It is called internally by MqttClient::loop() as well, so the logic will be identical. Just make sure to call it regularly (like every second or so), because otherwise the latency to the broker will increase and you risk running into timeouts.


Some example code:

$mqtt = new MqttClient($host, $port);

$mqtt->subscribe('some/topic', function (string $topic, string $message) {
    // Do whatever ...
});

while (true) {
    $iterationStartedAt = microtime(true);

    // First publish all messages that need publishing.
    // My assumption is that you have some kind of provider method for those messages.
    foreach ($this->getMessagesToPublish() as $messageToPublish) {
        $mqtt->publish($messageToPublish->topic, $messageToPublish->message, MqttClient::QOS_AT_MOST_ONCE);
    }

    // Then read and deliver all incoming messages.
    $mqtt->loopOnce();

    // Delay the next iteration if processing took no time at all. The delay is a maximum of one second.
    // This will run the iterations clock-aligned in 1 second intervals,
    // but executions may be skipped if the previous one took longer than 1 second.
    $iterationDuration = microtime(true) - $iterationStartedAt;
    if ($iterationDuration < 1) {
        usleep((1 - $iterationDuration) * 1_000_000);
    }
}

Great hint. Thank you! That works perfectly.