JustinTulloss/zeromq.node

RFC: Refactoring the flush loop

Opened this issue · 4 comments

Based on input from @kkoopa (https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/) and recent bugs, I want to suggest the following rewrite of the flush loop (roughly). Feedback incredibly welcome (@kkoopa, @reqshark).

JavaScript land (ES6 for clarity, but would implement with ES5 in mind):

class Message {
    constructor(parts, flags, cb) {
        this.parts = parts;
        this.cb = cb;
        this.sendError = undefined;  // to be set from C++
        this.flushed = false;  // to be set from C++
    }
}

class Socket extends EventEmitter {
    constructor() {
        this._zmq.onFlushDone = function (inbox, outbox) {
            // remove all flushed messages from outbox and call their callbacks
            // emit all inbox messages
        };
    }

    send(parts, flags, cb) {
        // to keep this example simple, assume flags will not contain SNDMORE

        this._zmq.enqueue(new Message(parts, flags, cb));
    }
}

C++ binding:

class Socket : public Nan::ObjectWrap {
    private:
        V8::Array outbox;
        V8::Array inbox;
        V8::Boolean isPaused;

        void OnPoll();
        void Flush();

        static NAN_METHOD(Enqueue);
        static NAN_METHOD(Pause);
        static NAN_METHOD(Resume);
        static NAN_METHOD(Read);
};



NAN_MODULE_INIT(Socket::Initialize) {
    Nan::HandleScope scope;
    Local<FunctionTemplate> t = Nan::New<FunctionTemplate>(New);

    Nan::SetPrototypeMethod(t, "enqueue", Enqueue);
    Nan::SetPrototypeMethod(t, "pause", Pause);
    Nan::SetPrototypeMethod(t, "resume", Resume);
    Nan::SetPrototypeMethod(t, "read", Read);
}


void Socket::OnPoll() {
    if (!isPaused) {
        Flush();
    }
}

void Socket::Enqueue(V8::Object msg) {
    outbox.push(msg);
    Flush();
}


bool Socket::FlushOne(actPaused) {
    bool processed = false;
    int events;
    zmq_getsockopt(ZMQ_EVENTS, &events);

    if ((events & ZMQ_POLLOUT) != 0 && outbox.length > 0) {
        sendOne();
        processed = true;
    }

    if ((events & ZMQ_POLLIN) != 0 && !actPaused) {
        readOne();
        processed = true;
    }

    return processed;
}


void Socket::Flush() {
    bool processing;

    do {
        processing = FlushOne(isPaused);
    } while (processing);

    onFlushDone(inbox, outbox); // notifies JS
}


void Socket::Read() {
    // read a single message while paused

    if (FlushOne(false)) {
        onFlushDone(inbox, outbox); // notifies JS
    }
}

Might work. It is hard to tell without actually testing the finished code.

What do you think would be the risky bits?

As always, in order of importance: lost messages, strangeness and performance. The proposal looks good at a glance. Lost messages and strangeness should be somewhat avoidable with regression tests. Performance can always be tweaked after the code is working.

I wonder how the pausing works in practice. Are incomming messages queued up in zmq's buffers? Will they all arrive in a bunch once the socket gets unpaused? At which point do said buffers fill up? What happens then?

Indeed, the buffering would be ZMQ level buffering, which I think is how we should already be doing it (but aren't). It's a change in behavior, but a preferable one imho, since ZMQ semantics will be in place instead of arbitrary node-zmq ones. How those buffers behave, I think is controllable with socket options.