nats-rpc/nrpc

Stream reply pattern

Closed this issue · 4 comments

I would like to propose the addition of streamed replies.

Use cases

Long lasting computing

A method call may take more than a reasonable timeout. In such case, the server-side could send a first reply as soon as possible, that basically says "I'm working on it, hold on", then may be come other empty replies until the actual result is ready and can be returned. Only then the client would unsubscribe.

Big replies

A method call may have to return a volume of data that is too big for being encoded in a single nats message, if not a protobuf message (which is recommended to remains small enough).
A streamed reply would allow to send several reply messages, and the client would do what is needed with it (saving them, assembling a bigger data structure, whatever).

.proto example

A new option "streamed_reply" could enable this pattern for a given method:

message FileList {
    repeated filename = 1;
}
service Demo {
    rpc ListFiles(nrpc.Void) returns (FileList) {
        option (nrpc.streamed_reply) = true;
    }
}

protocol details

The client subscribes to the reply subject without auto-unsubscribe, because it does not know how many messages will be received.
Each message of the reply can be one of:

  • empty message: basically means "hold on, work in progress"
  • an encoded output type: a chunk of data to be passed downstream
  • a normally encoded error with a special type "EOS", and a message containing the number of messages that were sent so the client can be sure nothing is missing.
  • a normally encoded error

The duration between two message must not exceed a typical nats timeout value (a few seconds). If it does, the client is supposed to return a timeout error.

client mapping

Example of a wrapper for the above .proto definition:

func (c *DemoClient) ListFiles(chan<-FileList) error {}

The chan is expected to be consumed in a separate goroutine, which can make things a little more complicated that we would like.

An alternative could make things easier:

func (c *DemoClient) ListFiles(chan<-FileList, chan<-error) {}

Here there is no need to lauch a goroutine, a simple for{select{}} is enough to consume the stream.

In both case the channel would be closed by the generated code when the reception is over.

server mapping

Example of a Server interface function for the above .proto definition:

type DemoServer interface {
    ListFiles(context.Context, chan<-FileList) error
}

The function would return only when everything is over. As long as the function is running, the nrpc code takes care of sending empty messages to avoid a timeout.

The server implementation has the responsibility to ensure messages are small enough to feat in a nats message. If a message exceeds the size, the context is canceled and the implementation is expected to return.

@mdevan I will work on this quickly, and would really appreciate your feedback on the idea and the proposed solution.

It would be nice to have a way for the server to cancel the request if the client stops listening.
I am not sure what would be the best pattern to do that, but somehow the client should send messages to keep the server request alive. It could done on a sub-subject of the reply subject.

A few thoughts:

  • Why should there be a heartbeat/keep-alive type responses at all? Client sends message to ListFiles with a dedicated inbox. Server sends 0 or more valid responses, and finally an EOS response. Client unsubscribes when it gets the EOS response. The user code will assume a reasonable timeout for the entire operation to timeout, and will call client.CancelListFiles() if that is exceeded.
  • The nats timeout (which is only for individual messages) is a hard error and the detecting side has to abort their side of the operation, and hope that the other side will detect it and cleanup.
  • Having no heartbeat should help with the case when it takes more time for the server-user-logic to generate a response than the heartbeat interval.
  • Consider this client-side usage: (this is similar to how Google Cloud pub-sub client, which is a streaming grpc client wrapper, looks)
err := client.ListFiles(ctx, func(ctx context.Context, item *FileList) {
    // process item here
})
// exits only when server sends EOS, or there is a connection error
  • If the we allow the client-side to cancel an ongoing subscription, we can allow for an error return also:
err := client.ListFiles(ctx, func(ctx context.Context, item *FileList) error {
    // process item here, return non-nil error if subscription needs to be cancelled
})
// exits only when server sends EOS, or there is a connection error
  • When the client unsubscribes the server's send will fail with unknown inbox error? On this error the server can stop item generation by cancelling the generator's context.
  • The caller can impose an overall timeout inside the ctx passed to ListFiles.
  • The caller can also cancel the operation by cancelling the ctx passed to ListFiles.
  • The heartbeat from the client allows to cancel a potentially very long operation on the server when the
    client disappear without notice.
    It may not be a common use case, but it is the only way to detect it, as there no such thing as a
    'unknown inbox' error in nats (that I am aware of).
    Also, the server side would not be much more complex that without the heartbeat, as it already needs to send keep-alive messages if the implementation waits too long between two messages.
    We can decide we do not want to support such a feature though.
  • Using a context to cancel the operation on the client side is a great idea, thanks.
  • I already came to the conclusion channels were not a good idea for the client side, but had no clear idea on the best alternative. I like the api you propose instead, will give it a try

Thanks a lot your feedback!