nats-rpc/nrpc

Stream replies are missing a backpressure mechanism

Opened this issue · 9 comments

Currently the client has no mean to slowdown the emission of the messages by the server.

There should be one, that should not slow down emissions just because of its presence.
That rules out the client emitting a ack message for each message received.

A possible way would be to send, in a heartbeat message, an optional pause/resume command. The client would send it when its local buffer is filled (or about to), and when it is emptied (or about to).
The nice thing here is that its fully compatible with the current protocol, in both direction.

@mdevan I would like your opinion on this before coding

Thanks

Protocol-wise, using a heartbeat message to indicate pause/resume looks good.

I assume you're not going to leave it up to the user's client to decide when to call a StreamCallSubscription.Pause() (or equivalent), because this would complicate the user-facing API somewhat?

Maybe rather have StreamCallSubscription.nextCh as a buffered channel, keep track of how full it is, and have StreamCallSubscription.loop() automatically issue pause/resume heartbeats when high water mark/low water mark thresholds are hit? These thresholds (and maybe the capacity of nextCh also) can then be optionally configured via MethodOptions.

What about the server-side though? Ideally the user's server should be able to propagate the backpressure further upstream, so it should be able to do something like:

func (s *MyServer) sendRecords(ctx context.Context, send func(Record) ?) {
    cursor := s.startQuery()
    for !cursor.Done() {
        r := cursor.getNextRecord()
        for send(r) == nrpc.StreamPaused {
            s.pauseQuery()
            time.Sleep(5 * time.Second)
            s.resumeQuery()
        }
    }
    cursor.Close()
}

Client side, I agree: the user should not have to handle the pause/resume.

Server side, I think propagate the back-pressure should be the result of the 'send' function not returning until the sending has resumed.
If the user server needs to propagate upstream explicitely, it can use its own buffer channel to control the pressure.

The server might need control on how long the send() may take? At least as a timeout?

The timeout is a good idea: very simple API, and a lot can be done without adding a complex mechanism.
Will do that.

I should be able to work on this next week (not sure though).

I was reading the go-nats sources and discovered that it already have a reception buffer, and we can get access its level at any time. It should be a good base for the back-pressure mechanism.

This is something I could/would need. Any status on this?

I have not spent time on this subject yet. I do not plan to do soon, but I will need to someday for sure.

Re-reading the thread, I think passing a context to 'send' instead of a timeout would give more control and be simpler in the basic case.