Discuss: sending multipart messages
skrap opened this issue · 12 comments
ZMQ supports multipart messages, which have the useful property that if some part of the multipart message is received, then it's guaranteed that all of the parts are received. This lets zmq-based protocols use multipart messages for easy segmentation of the messages.
For example, it's common for pub/sub messages to be sent as multipart messages, with the topic sent as the first part (and subject to publisher-side filtering) and a payload sent as the second part of the multipart message.
While it's possible to support incoming multipart messages with this crate (via accumulation of incoming messages into a collection, and testing for multipart completion via get_rcvmore
) I don't think it's possible to actually send multipart messages currently. We should support this!
The underlying ZMQ crate supports these messages via the send_multipart
function, or via manual injection of the zmq::SNDMORE
into the flags argument of the zmq::send
function for all but the last of the multipart messages.
This issue is intended to be a discussion of how to best support the sending of multipart messages.
Here are some options I see:
-
Teach
Poller
tosend_multipart
- This could accept a stream of messages
- ...or a iterable collection of messages.
- ...Or either.
There's an efficiency tradeoff in requiring the preallocation of all multipart message parts before sending, but there's an ergonomic benefit to being able to compose a multipart message without chaining futures together.
-
Teach
Poller
tosend_message_with_flags
- This would leave the logistics of arranging the multipart message up to the API's consumer. It would probably be less ergonomic to use, but with the benefit of transparency of what TMQ is up to behind the scenes.
There's probably other options out there! Just wanted to start the discussion. If this isn't the best place for this type of work, please let me know and I'll move this elsewhere.
More options:
- Change from zmq::Message as the
SinkItem
to some collection-of-messages type. tokio-zmq uses aVecDeque<S: zmq::Sendable>
, which seems usable, though I'm not sure what the advantages of that over aVec<>
are. - Wrap
zmq::Message
in an outer type (tmq::Message
could be used for this) which includes ahas_more: bool
, and possibly other flags as well. Switch to using that as theSinkItem
type.
I think that is a good idea!
From the look of the library, the send_multipart
doesn't use the Sendable
trait for the type bounds, I'm wondering if that's because HKT or similar prevents it. I'd say at a minimum we would use the same types, i.e, IntoIterator<Item = Into<Message>>
for sending. For receiving, I'm not too sure, but I would say maybe a Vec<Message>
would be a good start. Not sure why a VecDeque
is used, since I'm assuming the number of multipart messages is normally pretty small? Would need to benchmark!
For subscribe, I'd imagine something like this could work:
let request = subscribe(&Context::new())
.connect("tcp://127.0.0.1:7899")
.expect("Couldn't connect")
.subscribe_multipart("")
.for_each(|val| {
info!("Got {} messages", val.len())
Ok(())
})
.map_err(|e| {
error!("Error Subscribing: {}", e);
});
The subscribe_multipart("")
would need to return a new struct type SubMultipart
, which would implement the Stream
trait but the Item
would be equal to Vec<zmq::Message>
. This may mean duplicate structs for each socket, but multipart style.
The Poller
trait will need two new methods:
fn send_multipart_message<I: IntoIterator<Item = M>, M: Into<zmq::Message>>(&self, msg: I) -> Poll<(), Error>;
fn recv_multipart_message(&self, msg: &mut Vec<zmq::Message>) -> Poll<(), Error>;
Not sure if sockets mix and match multipart messages with single ones? I would imagine if you're in "multipart mode" then a single message would be a length 1 vec.
I'm playing around with some of the options to see how the ergonomics work out. Having a multipart SinkItem (probably Vec or somesuch) feels the best so far. However, we would need to be certain that edge cases involving zmq's buffering strategy work out properly. Consider the case where the first half of a two part message is enqueued successfully, but the final part is not, for example.
Definitely needs to be tested. I wonder whether the high water mark comes into play here: http://api.zeromq.org/2-1:zmq-setsockopt#toc3
I was experimenting with this API, trying to make a single Sink which could accept either a multipart message or a single zmq::Message
(aka "frame", or "message part"), but it seems we may need to wait until futures-rs 3 for this: rust-lang/futures-rs#1481
For right now I'm focusing more on changing the unit of sending and receiving to a Vec<zmq::Message>
. This would cause one extra allocation per sent multipart message, but it unblocks progress on multipart sending. Comments definitely welcome!
I would assume there'd need to be different Sink
structs as there would be different Stream
structs. Duplication is unavoidable, but I have a feeling that macros may help here.
For the SinkItem
have you tried with IntoIterator<Item = Into<Message>>
? Otherwise I think Vec<Message>
is fine as a first shot!
One challenge I can see with using the same Sink
for both multipart and normal is what function to call from the zmq library, since it's a different function depending on whether it's multipart or not, so having it generic here may be a bit hard to wrestle types.
If the IntoIterator
doesn't work I think sticking with Vec<Message>
for now to keep things simple would be appropriate.
I've got an initial version the multipart_support
branch, but I think I'm going to refactor it quite substantially before cutting a new version: https://github.com/cetra3/tmq/tree/multipart_support
I was thinking about this a bit a few days ago, actually! One of zmq's guarantees is that if a single frame of a message is received, that all parts will be received. This means that all parts are already in buffered in memory at the time that the first part is emitted by zmq. Based on my understanding of common usage of zmq, it seems like there's no practical efficiency gain to receiving single frames of a multipart message. To me, this points to an API that deals only with MultipartMessage objects, instead of single frames. I've not taken a look at your changes yet, but if you're poking at this stuff I wanted to give my thoughts.
I have started working on a TmqMessage
type which at the moment is just an enum for a Single message or a Multipart message, with the aim to keep the ergonomics as clean as the current implementation, but it's managed to make it more messy.
It works, but I don't like the way it is used from a consumer perspective, so I'm possibly going to rework it further.