php-mqtt/client

Error socket error when sending message larger that 200kb

alysdal opened this issue · 10 comments

Hi,

I'm trying to publish a 200 kb message to a topic, but i keep getting the socket error:

 PhpMqtt\Client\Exceptions\DataTransferException

  [65] Transferring data over socket failed: Sending data over the socket failed. Has it been closed?

  at vendor/php-mqtt/client/src/MqttClient.php:1147
    1143$result = @fwrite($this->socket, $data, $length);
    11441145if ($result === false || $result !== $length) {
    1146$this->logger->error('Sending data over the socket to the broker failed.');
  ➜ 1147throw new DataTransferException(
    1148DataTransferException::EXCEPTION_TX_DATA,
    1149'Sending data over the socket failed. Has it been closed?'
    1150▕             );
    1151▕         }

Splitting the message is not an option as this is used in an OTA update for a device that expects a single binary message.

Here is a code example to reproduce the issue:

$mqtt = new \PhpMqtt\Client\MqttClient('test.mosquitto.org', '1883');
// Create and configure the connection settings as required.
$connectionSettings = (new ConnectionSettings)
     ->setUsername(null)
     ->setPassword(null);

$mqtt->connect($connectionSettings, true);
$count = 1000*200; // 200 kilobytes
$mqtt->publish("topic", str_repeat('-', $count));

Any ideas how i might accomplish this or how we could solve this?

Have you tested publishing a similar sized message to your broker using another MQTT client like the mosquitto_pub CLI?
In my experience, this is not an issue with this library but the configuration of the MQTT broker in use. Because of similar reports in the past, there also is a test which publishes and receives a 2MB message. The tests are also run against multiple brokers (with the respective configuration in the .ci directory).

Thanks for getting back so soon.

Testing with mosquitto_pub was succesful and worked.
mosquitto_pub -h test.mosquitto.org -t "test" -d -f "python-firmwareupdater/blink.bin" (350kb file)

But i still get the socket error when using PhpMqtt.

I've given this a short test and it seems that test.mosquitto.org cannot handle more than ~100kB of data at once. The problematic code is:

client/src/MqttClient.php

Lines 1138 to 1160 in 22a207e

protected function writeToSocket(string $data, int $length = null): void
{
$calculatedLength = strlen($data);
$length = min($length ?? $calculatedLength, $calculatedLength);
$result = @fwrite($this->socket, $data, $length);
if ($result === false || $result !== $length) {
$this->logger->error('Sending data over the socket to the broker failed.');
throw new DataTransferException(
DataTransferException::EXCEPTION_TX_DATA,
'Sending data over the socket failed. Has it been closed?'
);
}
$this->bytesSent += $length;
$this->logger->debug('Sent data over the socket: {data}', ['data' => $data]);
// After writing successfully to the socket, the broker should have received a new message from us.
// Because we only need to send a ping if no other messages are delivered, we can safely reset the ping timer.
$this->lastPingAt = microtime(true);
}

The fwrite($this->socket, $data, $length) returns 104256 (consistently), which is not the expected length and therefore an error. This does not happen when using a local Mosquitto though (docker run -it -p 1883:1883 eclipse-mosquitto:1.6). So there must be some proxy/buffer limit causing this behavior.

Nevertheless, I'll look into it as soon as possible. Maybe calling fwrite() multiple times with less data is the solution.

I think you're right. I checked the php documentation for fwrite, and the first note suggests to do just that for network streams:

See fwrite_stream function on https://www.php.net/manual/en/function.fwrite.php.

It might even be a drop in replacement for the current line: $result = @fwrite($this->socket, $data, $length);

From: https://www.php.net/manual/en/function.fwrite.php.

function fwrite_stream($fp, $string) {
    for ($written = 0; $written < strlen($string); $written += $fwrite) {
        $fwrite = fwrite($fp, substr($string, $written));
        if ($fwrite === false) {
            return $written;
        }
    }
    return $written;
}

I'm a bit skeptical about this solution due to this comment. I'm also not sure calling fwrite() multiple times on the same network stream will yield a different result because the buffer may still not be flushed for example. The only reason fwrite() returns early in the first place is that we are running in non-blocking (async) mode for better performance (well, maybe at least). Removing

stream_set_blocking($socket, false);

helps, but is not a viable option in my opinion. I'll try and see what other implications this has and maybe offer a configuration option for it.

Good catch. A config option would be nice for solving this problem, however it would be great to be able to handle this dynamically. It is out of my expertise unfortunately.

Thanks for giving a hint to where the issue arises (async). I might be able to create a temporary fix by extending the MqttClient class for my use case.

I've been having a second look at this issue and I'm actually a bit smarter now, but unfortunately I still do not have a solution ready yet.

We actually use stream_set_blocking($socket, false) because of the loop() / loopOnce() methods. These methods perform multiple actions in an event loop which requires a non-blocking stream. The loop does read data from the socket (1), processes the data it if it contains a full message (2), resends unacknowledged messages after a timeout (3) and sends pings to the broker if necessary (4). Because of this, we cannot use blocking mode as the loopOnce() would be stuck at step (1) then:

$this->buffer .= $this->readAllAvailableDataFromSocket(true);

And this would potentially mean that the client does not ping the broker in time, especially when no data is sent from the broker to the client regularly. The result would be a disconnect.

If, however, you are not using the loop() or loopOnce() methods, then from what I can tell at the moment, it should be safe to use a blocking socket. I'll need a bit more time to investigate and test this though. If the assumption holds up, I'll add an option and proper documentation for it.


A second possible solution would be what you suggested, calling fwrite() on the socket in a loop. The problem with this is twofold from what I can tell:

  1. fwrite() returns false on error (which we can easily check and exit on) or the bytes written otherwise. If, however, the socket is broken in some way, according to people on the internet, fwrite() sometimes returns 0, which would cause an infinite loop. So we either have to add some kind of limit/timeout for the calls with result 0 or exit immediately if 0 written bytes are returned. The latter does not seem to work according to my tests though, as the underlying send buffer is flushed asynchronously and immediately calling fwrite() a second time will fail to write any new data in most cases as the buffer is still full.

  2. The whole reason behind non-blocking sockets is that one can perform other actions in the same PHP process while the socket is busy or does not provide new data. In case of a webserver, where one has multiple sockets (one for each client), the advantage of this is even more obvious because the server can serve whatever client has sent a request without waiting for one (and only one) client to send a request. So to summarize: we use asynchronous stream processing to avoid blocking the PHP process.

    If we now call fwrite() in a loop, we actually make our PHP process dependend on the progress of the socket. If the receiver is not able to receive or process the data we send, we cannot send more data because the buffer is full. This should not lead to a disconnect due to timeout (because we actively sending data), but it might cause issues with loop event handlers and stuff.

    My current position is that it should not be an issue to do this though as we are only using one socket, so blocking the process is - more or less - ok. I also gave the idea of a send buffer in the MqttClient class a quick thought (similar to what we use to receive data), but as this would most likely cause a backlog overflow in case the data cannot be sent fast enough, it is not a solution either.

So, to sum it all up: I'll need a bit more time to come up with a proper solution to this issue. 👍

I've added a new connection setting for the blocking mode in #135 and released it in v1.7.0. It seems like the simplest solution to me and I think covering all edge cases is next to impossible anyway.

Hi @Namoshek,
Thanks for looking into the issue and fixing it.

I found a typo, in the ConnectionSettings class in release 1.7.0, that unfortunately made the new setting unusable.

I have made a tiny PR to fix this. Feel free to close it and make the changes yourself if you prefer.

#139

@alysdal Thanks and sorry for the inconvenience, I'll release the fix immediately.