Error socket error when sending message larger that 200kb
alysdal opened this issue · 10 comments
Hi,
I'm trying to publish a 200 kb message to a topic, but i keep getting the socket error:
PhpMqtt\Client\Exceptions\DataTransferException
[65] Transferring data over socket failed: Sending data over the socket failed. Has it been closed?
at vendor/php-mqtt/client/src/MqttClient.php:1147
1143▕ $result = @fwrite($this->socket, $data, $length);
1144▕
1145▕ if ($result === false || $result !== $length) {
1146▕ $this->logger->error('Sending data over the socket to the broker failed.');
➜ 1147▕ throw new DataTransferException(
1148▕ DataTransferException::EXCEPTION_TX_DATA,
1149▕ 'Sending data over the socket failed. Has it been closed?'
1150▕ );
1151▕ }
Splitting the message is not an option as this is used in an OTA update for a device that expects a single binary message.
Here is a code example to reproduce the issue:
$mqtt = new \PhpMqtt\Client\MqttClient('test.mosquitto.org', '1883');
// Create and configure the connection settings as required.
$connectionSettings = (new ConnectionSettings)
->setUsername(null)
->setPassword(null);
$mqtt->connect($connectionSettings, true);
$count = 1000*200; // 200 kilobytes
$mqtt->publish("topic", str_repeat('-', $count));
Any ideas how i might accomplish this or how we could solve this?
Have you tested publishing a similar sized message to your broker using another MQTT client like the mosquitto_pub
CLI?
In my experience, this is not an issue with this library but the configuration of the MQTT broker in use. Because of similar reports in the past, there also is a test which publishes and receives a 2MB message. The tests are also run against multiple brokers (with the respective configuration in the .ci
directory).
Thanks for getting back so soon.
Testing with mosquitto_pub was succesful and worked.
mosquitto_pub -h test.mosquitto.org -t "test" -d -f "python-firmwareupdater/blink.bin"
(350kb file)
But i still get the socket error when using PhpMqtt.
I've given this a short test and it seems that test.mosquitto.org
cannot handle more than ~100kB of data at once. The problematic code is:
Lines 1138 to 1160 in 22a207e
The fwrite($this->socket, $data, $length)
returns 104256
(consistently), which is not the expected length and therefore an error. This does not happen when using a local Mosquitto though (docker run -it -p 1883:1883 eclipse-mosquitto:1.6
). So there must be some proxy/buffer limit causing this behavior.
Nevertheless, I'll look into it as soon as possible. Maybe calling fwrite()
multiple times with less data is the solution.
I think you're right. I checked the php documentation for fwrite
, and the first note suggests to do just that for network streams:
See fwrite_stream
function on https://www.php.net/manual/en/function.fwrite.php.
It might even be a drop in replacement for the current line: $result = @fwrite($this->socket, $data, $length);
From: https://www.php.net/manual/en/function.fwrite.php.
function fwrite_stream($fp, $string) {
for ($written = 0; $written < strlen($string); $written += $fwrite) {
$fwrite = fwrite($fp, substr($string, $written));
if ($fwrite === false) {
return $written;
}
}
return $written;
}
I'm a bit skeptical about this solution due to this comment. I'm also not sure calling fwrite()
multiple times on the same network stream will yield a different result because the buffer may still not be flushed for example. The only reason fwrite()
returns early in the first place is that we are running in non-blocking (async) mode for better performance (well, maybe at least). Removing
Line 293 in 22a207e
helps, but is not a viable option in my opinion. I'll try and see what other implications this has and maybe offer a configuration option for it.
Good catch. A config option would be nice for solving this problem, however it would be great to be able to handle this dynamically. It is out of my expertise unfortunately.
Thanks for giving a hint to where the issue arises (async). I might be able to create a temporary fix by extending the MqttClient class for my use case.
I've been having a second look at this issue and I'm actually a bit smarter now, but unfortunately I still do not have a solution ready yet.
We actually use stream_set_blocking($socket, false)
because of the loop()
/ loopOnce()
methods. These methods perform multiple actions in an event loop which requires a non-blocking stream. The loop does read data from the socket (1), processes the data it if it contains a full message (2), resends unacknowledged messages after a timeout (3) and sends pings to the broker if necessary (4). Because of this, we cannot use blocking mode as the loopOnce()
would be stuck at step (1) then:
Line 676 in 22a207e
And this would potentially mean that the client does not ping the broker in time, especially when no data is sent from the broker to the client regularly. The result would be a disconnect.
If, however, you are not using the loop()
or loopOnce()
methods, then from what I can tell at the moment, it should be safe to use a blocking socket. I'll need a bit more time to investigate and test this though. If the assumption holds up, I'll add an option and proper documentation for it.
A second possible solution would be what you suggested, calling fwrite()
on the socket in a loop. The problem with this is twofold from what I can tell:
-
fwrite()
returnsfalse
on error (which we can easily check and exit on) or the bytes written otherwise. If, however, the socket is broken in some way, according to people on the internet,fwrite()
sometimes returns0
, which would cause an infinite loop. So we either have to add some kind of limit/timeout for the calls with result0
or exit immediately if0
written bytes are returned. The latter does not seem to work according to my tests though, as the underlying send buffer is flushed asynchronously and immediately callingfwrite()
a second time will fail to write any new data in most cases as the buffer is still full. -
The whole reason behind non-blocking sockets is that one can perform other actions in the same PHP process while the socket is busy or does not provide new data. In case of a webserver, where one has multiple sockets (one for each client), the advantage of this is even more obvious because the server can serve whatever client has sent a request without waiting for one (and only one) client to send a request. So to summarize: we use asynchronous stream processing to avoid blocking the PHP process.
If we now call
fwrite()
in a loop, we actually make our PHP process dependend on the progress of the socket. If the receiver is not able to receive or process the data we send, we cannot send more data because the buffer is full. This should not lead to a disconnect due to timeout (because we actively sending data), but it might cause issues with loop event handlers and stuff.My current position is that it should not be an issue to do this though as we are only using one socket, so blocking the process is - more or less - ok. I also gave the idea of a send buffer in the
MqttClient
class a quick thought (similar to what we use to receive data), but as this would most likely cause a backlog overflow in case the data cannot be sent fast enough, it is not a solution either.
So, to sum it all up: I'll need a bit more time to come up with a proper solution to this issue. 👍