Multiple outstanding sent messages with QoS?
AndrewHeim opened this issue · 6 comments
Does this library have outstanding sent messages with QoS?
A bit more background: MQTT has a limit on the max message size - I get the feeling that it may possibly vary a bit from platform to platform, but I've reached the conclusion that an MQTT message is within an order of magnitude of a UDP datagram (65k bytes). I forget at the moment whether it's closer to half or double.
When you have a high-latency link, like say an IoT device, the number of outstanding messages (with QoS) can strongly influence the total effective bandwidth. If each message sent needs to receive acknowledgement before the next one is sent, with 100+ms between each, that puts a damper on the data you can send.
After looking through the code, it gave me the impression that it only had one packet outstanding at a time. It seemed to send a message, and if QoS was enabled it would wait there until receiving acknowledgement before returning (and sending the next message). This would be opposed to a scheme where multiple messages could be sent and are waiting outstanding, pending an acknowledgement. But I could be wrong in my conclusion, depending on how those VIs were called - the architecture is well abstracted and appears very extensible.
Did I understand that right? I'm looking for a library that supports multiple messages outstanding, so I can get a bit higher throughput. Not massive throughput of course, since MQTT isn't the MOST efficient thing out there, but a bit higher than 3-4 messages per second at that size. (It's important to use such a messaging layer with scenarios where a TCP session might break, because those buffers don't tell you exactly what data made it and what didn't - MQTT or such a layer does.)
Hi @AndrewHeim ,
that is an excellent question and I guess it is a matter of interpretation of the protocol rules.
I had not interpreted the spec with this in mind as I was looking, first and foremost, to ensure that the packets and acknowledgements are handled in the proper order (see section 4.6 of spec).
Is this a breach of protocol to send a packet before the previous message has been acknowledged? I'll have to dive a bit into some parts of the specification that I have yet to implement (resend with DUP flag).
That being said, I think I see your point: you would like to be able to enqueue the next message to the server while the server processes the packet ack back to the publisher. The message ordering could still be maintained but you would essentially want to be able to use the full time slots asynchronously. At this time, you are correct that the Client Public Method does not allow this publication pattern, but the underlying architecture certainly is compatible.
Have a look at the QoS = 1 diagram here, it helps me visualize the many processes running in the MQTT broker.
Follow the red and blue lines in the diagram... you'll see that only the Client API (green box) is blocking you.
I guess it would be possible to abstract the Ack into the session, and simply get an error event (after a certain time) if the ack has not be received.
Thank you for the very thorough answer.
As I think about it more, my request is a little bit of a silly one if you take into account the API design. In short, I'm not sure that much better could be done other than exactly what you describe.
I remember when I went looking through the source of a C-based MQTT library, I was satisfied when I saw something like
outbound_queue=1000 //Defaults to 1000 outbound messages
I think I was expecting a queue-like interface where a packet was retried until it got pushed off the back of the queue by the 1001th new entry. But that's not a great answer either.
How to have an API that both lets you easily send multiple messages AND lets you choose to retry if there's a failure (while knowing what data went with what message)? On first brush, it's easy to have the caller subscribe to a callback on case of failure, but then how do you decide when to expire your data? You'd have to have an event for success too.
By the time you have all those events flying around in LabVIEW (and the potential for bottle-necking it might introduce), along with the data storage space for sent-and-possibly-to-be-retried-data, it gets complicated. Far simpler to call Client Publish in parallel for as many writes as you want your pipe to be wide. Certainly less elegant (and less dynamic to network conditions) but possibly far easier to implement.
In summary: After navel-gazing about my request, I think being able to call Send in parallel is as good in a per-development-time way as any other API. Unless you have a better idea?
I'm not wondering whether ANY library may give me what I want without modification. I need to do some digging to make sure MQTT as a protocol will support the data rates I need.
I'll keep this opened as it is a distinct possibility that one could provide for two different ways to implement the API... one that is synchronous (with wait for ACK) and one that queues it up. This extension would have no effect whatsoever on the underlying architecture, so I will consider implementing this in the future.
thanks for the suggestion!
Note to self:
Consider addition of an asynchronous public API call (for QoS>0) with a callback mechanism through a user event for failure to publish such an elevated QoS message. Must be compatible with DUP packets, reconnection attempts, session storage, etc.
Hello Francois
As you may recognized from advertising on the LabVIEW Champions forum, I am about to abandon LabVIEW Data Logging and Supervisory Control Module and freeze and LabVIEW 2021 SP1. Instead I adopted MQTT and InfluxDB/Telegraf.
You may also recognized our CS++ add-on libraries for the NI Actor Framework. A good starting point is https://git.gsi.de/EE-LV/CSPP/cspp-template, branch develop. It used MQTT as default. I adopted your libraries to implement derived classes to support MWTT in CS++. You find it in the submodule https://git.gsi.de/EE-LV/CSPP/CSPP_MQTT.
When using QoS=0 in principle everything works fine. Setting QoS=1 leads to dramatic performance issues resulting in error "Not enough memory to complete this operation." in application with many topics. This seems to be related to the synchronous message handling for QoS>0. You noted the problem yourself above. This is true for the G Open Source Project MQTT Broker as well as Mosquitto. So, The problem seems to exist also in the client libray, not in the broker only.
Furthermore the Mosquitto broker report in its log-file: "protocoll error" My SW tries to reconnect and that is working for a few times, then Mosquitto resuses connection due to many reconnects. It assumes a bad client.
So, I really would appreciate if you could implement the asynchronous message handling.
If you are looking for a real world project with ~13500 topics refer to - https://git.gsi.de/EE-LV/CSPP/GSI_VAC/VacuumHeating, branch: feature/MQTT.
This project is used to monitor the bake-out process of a heavy ion accelerator beamline to reach the desired vacuum. One or more PC's are running the operating GUI. The bake-out control is designed to execute on three LabVIEW RT PXI-Controllers. Due to problems building RT executables in LabVIEW RT 2021 SP1, a bug report is accepted and NI is working on a solution, I prepared a version with simulated devices all executin on one Windows PC, resulting in ~90% CPU load. The errors are happening quit soon in this scenario.
If you are willing to use that project as an example for testing, of coarse I am willing to assist to setup the project on your computer.
It would be very nice, if we could continue with our existing projects in distributed heterogenious environments, e.g. Python or EPICS based control systems, while slowly mirgrating.
Best Holger
Hi Holger @HB-GSI ,
I've been totally swamped with work and have not had much time to devote to this library apart from very low-hanging fruit bug fixes. To be completely honest, I don't anticipate that I will be able to work on and test such a change any time soon. On the broker side, there might be easier optimizations in the subscription engine that could increase throughput to the point where this limit is pushed a bit further. It would not be a truly scalable long-term solution but might be giving some breathing room until a solution can be enacted on asynchronously.
I think the QoS latency is purely a Client API issue because of the synchronous wait, but the broker does not wait at all in a QoS=1 and emits the puback response as soon as it can. The "not enough memory" error is likely either 1) a protocol violation in the handle incoming packet of the broker's core (some infinite loop due to a bug?), or 2) something related to the subscription engine which makes loads of copies.
One thing you could explore to find out which one is your biggest hit in your projects is to overwrite the MQTT Tracer object in the MQTT Broker library, and monitor the incoming and outgoing messages. This way, you can measure the latency for matching PUB and PUBACK IDs. I have wanted to produce such a debugging tool for a while but I have nothing yet to show for it. The hooks are in place in the broker lib.
Noteworthy to know:
- Subscriptions are asynchronous processes and they currently get their own copy of the incoming message. They will process every incoming topic to match their filters, so they use a lot of memory and could be optimized to maintain a dictionary of previous matches. If your topics are fixed (not creating reply-uri from GUIDs for every transaction), that would be a lesser hit on the CPU, leaving more for the PUBACK transaction to speed up.
If you want to tackle the asynchronous client ACK handling, I welcome pull requests!
Hi Francois
Thanks for your response.
For the moment and the project in focus QoS=0 is fine.
I will discuss after vacation how we want to continue.
Best Holger