RFC: Refactoring the flush loop
Opened this issue · 4 comments
Based on input from @kkoopa (https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/) and recent bugs, I want to suggest the following rewrite of the flush loop (roughly). Feedback incredibly welcome (@kkoopa, @reqshark).
JavaScript land (ES6 for clarity, but would implement with ES5 in mind):
class Message {
constructor(parts, flags, cb) {
this.parts = parts;
this.cb = cb;
this.sendError = undefined; // to be set from C++
this.flushed = false; // to be set from C++
}
}
class Socket extends EventEmitter {
constructor() {
this._zmq.onFlushDone = function (inbox, outbox) {
// remove all flushed messages from outbox and call their callbacks
// emit all inbox messages
};
}
send(parts, flags, cb) {
// to keep this example simple, assume flags will not contain SNDMORE
this._zmq.enqueue(new Message(parts, flags, cb));
}
}
C++ binding:
class Socket : public Nan::ObjectWrap {
private:
V8::Array outbox;
V8::Array inbox;
V8::Boolean isPaused;
void OnPoll();
void Flush();
static NAN_METHOD(Enqueue);
static NAN_METHOD(Pause);
static NAN_METHOD(Resume);
static NAN_METHOD(Read);
};
NAN_MODULE_INIT(Socket::Initialize) {
Nan::HandleScope scope;
Local<FunctionTemplate> t = Nan::New<FunctionTemplate>(New);
Nan::SetPrototypeMethod(t, "enqueue", Enqueue);
Nan::SetPrototypeMethod(t, "pause", Pause);
Nan::SetPrototypeMethod(t, "resume", Resume);
Nan::SetPrototypeMethod(t, "read", Read);
}
void Socket::OnPoll() {
if (!isPaused) {
Flush();
}
}
void Socket::Enqueue(V8::Object msg) {
outbox.push(msg);
Flush();
}
bool Socket::FlushOne(actPaused) {
bool processed = false;
int events;
zmq_getsockopt(ZMQ_EVENTS, &events);
if ((events & ZMQ_POLLOUT) != 0 && outbox.length > 0) {
sendOne();
processed = true;
}
if ((events & ZMQ_POLLIN) != 0 && !actPaused) {
readOne();
processed = true;
}
return processed;
}
void Socket::Flush() {
bool processing;
do {
processing = FlushOne(isPaused);
} while (processing);
onFlushDone(inbox, outbox); // notifies JS
}
void Socket::Read() {
// read a single message while paused
if (FlushOne(false)) {
onFlushDone(inbox, outbox); // notifies JS
}
}
Might work. It is hard to tell without actually testing the finished code.
What do you think would be the risky bits?
As always, in order of importance: lost messages, strangeness and performance. The proposal looks good at a glance. Lost messages and strangeness should be somewhat avoidable with regression tests. Performance can always be tweaked after the code is working.
I wonder how the pausing works in practice. Are incomming messages queued up in zmq's buffers? Will they all arrive in a bunch once the socket gets unpaused? At which point do said buffers fill up? What happens then?
Indeed, the buffering would be ZMQ level buffering, which I think is how we should already be doing it (but aren't). It's a change in behavior, but a preferable one imho, since ZMQ semantics will be in place instead of arbitrary node-zmq ones. How those buffers behave, I think is controllable with socket options.