Client Stream Methods support a result processing hook
namtzigla opened this issue · 6 comments
Right now the client stream generated method support only INSERT
sql statements and the return protobuf message must contain a field named count
I propose an enhancement and allow some basic server side processing where every request is prepared and executed, and the results will be passed to a function call that will process results.
For a method definition like this:
rpc ClientStreamingExample(stream ExampleTable1) returns (ReturnMessage) {
option (persist.ql) = {
query: "SELECT id AS \"table_id\", key, value, msg as inner_message, status as inner_enum FROM test_table WHERE id = $1"
arguments: ["id"]
};
}
pseudocode:
res := &ReturnMessage{}
while (message received) {
...
sql_result := execute_sql()
...
res.AddResult(request, sql_result)
}
stream.SendAndClose(res)
The generated type ReturnMessage
has to have implemented a function AddResult
that take two parameters (initial request data and sql result) that will process and assembly the result.
The advantage of this is that we can extend the server side processing and we can limit the network traffic with unnecessary data that will be discarded/mapped/reduced later anyway.
Should we limit this to client streaming methods? This would be easy to generate for all rpc calls.
it might be useful for other types of rpc to
I can get started on this on my branch. It seems fairly straightforward.
We have to combine this with before and after callbacks and this looks like an after callback especially if we want to implement it on every rpc call
So maybe it is not as straightforward as I thought it would be. Each way I think to implement it seems to come with pros and cons.
First I thought that I would write something like this:
type ResultHook interface {
AddResult(req, row interface{}) error
}
func (s *AmazingImpl) ClientStream(stream Amazing_ClientStreamServer) error {
(set up the statement and transaction like normal)
//we would know the return type of the method, so write type assertion
// here to know which code to generate
_, hasResultHook := ResultHook(ReturnProto{})
if hasResultHook {// receive on the stream, and process results
ret := &ReturnProto{}
for {
req, err := stream.Recv()
row := tx.Stmt(stmt).QueryRow(...)
ret.AddResult(req, row)
}
stream.SendAndClose(ret)
} else { // recieve on the stream and count the rows
for {
req, err := stream.Recv()
rowsAffected += tx.Stmt(stmt).Exec(...)
}
stream.SendAndClose(ReturnProto{ Count: rowsAffected})
}
}
This (or some variation of this) would work, but we have pretty much doubled the amount of code generated if we go this route. It would seem smarter to just detect if the proto message has a ResultHook at code generation time, and generate either the if, or the else instead of having both options in the generated code.
The problem with that approach is I don't know how we would get an instance of the return message go struct, without reflection, for testing if it satisfies the ResultHook interface. ex. rpc TestClientStream(stream ExReq) returns (ExRes){}
We technically know that the type returned by TestClientStream
is an instance of a go struct named: ExRes
, but at code generation time, there is no way to set up an instance of ExRes without reflection, because we only have the string of the name of the struct.
The other way we could do this would be to add another option to QLImpl in persist/options.proto. Just a boolean whether the result hook exists or not. We could look that option up at generation time, and if it exists, and is true, we generate the code using, otherwise we assign to the Count field.
The only downsides to this is that is requires us to change options.proto.
@namtzigla Any input on which way you would prefer this done?
Before Hook
a function that return an object and error and the meaning is that we skip sql execution if the object returned is not nil and the error is nil
Before on unary call
func BeforeHandler(req *pb.RequestMessage) (*pb.ResponseMessage, error)
data, err := BeforeHandler(req)
if err == nil && data != nil {
return data
}
if err != nil {
// throw error
}
Before on client streaming call
func BeforeHandler(req *pb.RequestMessage) (*pb.ResponseMessage, error)
for {
stream.Receive(req)
data, err := BeforeHandler(req)
if data != nil && err == nil {
continue
}
if err != nil {
stream.Close()
// throw error
return ...
}
....
}
Before on server streaming call
func BeforeHandler(req *pb.RequestMessage) ([]*pb.ResponseMessage, error)
data, err := BeforeHandler(req)
if data != nil && err == nil {
for _, elm := data {
stream.Send(elm)
}
stream.Close()
}
if err != nil {
// throw error
}
Before on bidirectional streaming call
func BeforeHandler(req *pb.RequestMessage) (*pb.ResponseMessage, error)
data, err := BeforeHandler(req)
if err == nil && data != nil {
return data
}
if err != nil {
// throw error
}