/go-grpc-example

学习怎样在golang中使用gRPC

Primary LanguageGo

go-grpc-example

演示Golang中基础的gRPC使用,包含基础的gRPC使用场景,包括四种RPC方式、gRPC状态码、metadata、拦截器的使用等。

参考文档:PROTOCOL-HTTP2Core-Conceptgrpc-go文档Doc

导包

go get google.golang.org/grpc

四种RPC方式

客户端发送一次请求,然后服务端返回一次响应,这个就是Unary RPC。

例如下面的例子,客户端每发送一个 CreateCellphoneRequest就会得到服务端的 CreateCellphoneResponse响应。

rpc CreateCellphone(CreateCellphoneRequest) returns (CreateCellphoneResponse);

客户端发送一次请求,服务器不再以一次性的方式返回响应,而是以数据流的方式返回响应(a stream of messages),这个就是Server streaming RPC。

在proto文件内定义rpc服务的时候,在返回值类型前面加上 stream关键字,就完成了这种类型的定义。

在这种模式下,服务端涉及的方法为 Send,用来发送数据流;客户端涉及的方法为 Recv,用来接收数据流。

例如下面的例子,客户端接收流式响应,每次接收都得到一个 Cellphone类型的数据

rpc SearchCellphone(FilterCondition) returns (stream Cellphone);

客户端发送请求的时候以数据流的方式发送请求数据(a stream of messages),服务端接收数据流,然后返回一次性的响应数据。

这种模式下,客户端涉及的方法为 Send用来发送流数据,CloseAndRecv用来接收响应数据;服务端涉及的方法为 Recv接收请求数据流,SendAndClose用来发送响应数据。

例如下面的例子,客户端发送字节流数据给服务端,服务端处理完成后,返回一个响应。

rpc UploadCellphoneCover(stream UploadCellphoneCoverRequest) returns (UploadCellphoneCoverResponse);

注意事项:

  1. 在golang中的gRPC实现中,客户端调用 Send方法发送数据的时候,并不会阻塞等待服务端调用Recv?(从测试来看好像这样)。服务端有可能随时通过返回错误来关闭流,所以客户端每次 Send或者最后 CloseAndRecv之后都要判断错误。
  2. 如果客户端在 Send过程中发生了在客户端这一侧的错误,那么直接返回对应的 error;但是如果是服务端返回了错误,那么在客户端再使用 Send的时候,就会笼统地返回 io.EOF错误,更为具体的错误需要在客户端侧调用 RecvMsg(nil)来获得。

客户端和服务端之间双向的数据交互都是数据流,这种方式可以发送多个请求和多个响应。

例如下面的例子。

rpc BuyCellphone(stream BuyCellphoneRequest) returns (stream BuyCellphoneResponse);

注意事项:

  1. 当客户端发送完所有的请求数据之后,调用 CloseSend方法关闭写端。
  2. 服务端通过 Recv不断接收请求,使用 Send发送响应。

gRPC的响应状态使用

基础使用

gRPC中表示响应的状态和状态码要用到这两个package

google.golang.org/grpc/codes
google.golang.org/grpc/status

通过 status.Error(c codes.Code, msg string) error这个接口可以返回响应错误信息

gRPC定义了一系列内置的错误码,可以在codes中设置,常见的错误码比如 codes.OK, codes.Canceled, codes.InvalidArgument等。

通过error获得Status:code+message

使用status.Convert(error) *status.Status函数,可以解析用 status.Errorf函数生成的 error,提取出当中的 codes.Codemessage

io.EOF

当服务器这一侧使用grpc的函数操作遇到io.EOF的时候,返回的error应该nil。

context.Context在gRPC中的使用

超时控制

使用 context.WithTimeout或者 context.WithDeadline

调用取消

使用 context.WithCancel

gRPC中时间类型的使用

在proto文件中 import "google/protobuf/timestamp.proto",这个是protobuf的内置message类型,用来表示时间戳。

生成的go代码中,类型为 timestamppb.Timestamp

time.Time的相互转换:

timestamppb.Timestamp -> time.TimeAsTime方法可以转化为go的内置 time.Time类型,timestamppb.Timestamp.AsTime()

time.Time -> timestamppb.Timestamptimestamppb.New(time.Time)

metadata

gRPC中的metadata是可以在传输携带的一组键值对数据,键值对的类型一般都是字符串,也可以是二进制数据。

获取metadata

使用grpc中的metadata包来获取(google.golang.org/grpc/metadata),metadata依附在context.Context中。

创建带有metadata的Context

使用 metadata.NewOutgoingContext函数完成context的创建;或者使用 metadata.AppendToOutgoingContext函数。

response trailer/header

服务端响应的时候,除了主体信息外,还可以额外携带header信息和trailer信息,即gRPC除主体信息外额外传输的一组数据,也是键值对的形式。

Unary RPC和Streaming RPC获取响应的trailer和header不一样。

  • Unary RPC获取响应的trailer/header:使用 grpc.Trailer(&metadata.MD)grpc.Header(&metadata.MD)生成 grpc.CallOption,作为选项传入。
  • Streaming RPC获取trailer/header:在stream上调用 Trailer()或者 Header()方法。

概念

拦截器可以设置在服务端一侧(server-side),也可以设置在客户端一侧(client-side)。同时还分成Unary RPC拦截器和stream RPC拦截器。在代码层面,拦截器本质是一个具有特定函数签名的函数。

grpc-interceptor

用法:服务端安装拦截器

在使用 grpc.NewServer()创建服务器的时候,通过生成 grpc.ServerOption指定拦截器,比如:

安装一个拦截器

// unary interceptor
server := grpc.NewServer(grpc.UnaryInterceptor(UnaryServerInterceptor))
// stream interceptor
server := grpc.NewServer(grpc.StreamInterceptor(StreamServerInterceptor))

安装多个拦截器

通过以下两个函数可以指定多个拦截器。

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

Server-side unary interceptor

grpc-interceptor-server-unary

Server-side stream interceptor

stream interceptor适用于streaming RPC,但是stream interceptor仅会被调用一次。具体的行为和Unary interceptor有差别。

grpc-server-interceptor

如果想要数据流中每个数据都被拦截,则需要手动包装grpc.ServerStream,并且重新实现SendMsgRecvMsg方法。

原理:grpc在进行流式通信收发的时候,会调用grpc.ServerStream接口的SendMsg和RecvMsg方法。

type myWrappedStream struct {
	grpc.ServerStream
}

// 实现SendMsg方法
func (s *myWrappedStream) SendMsg(data interface{}) error {
	// TODO 就是在这里实现自定义流拦截器的发送逻辑
	// 这里简单打印日志
	// data是需要往流中发送的数据,我们可以在这里对这个准备发送的数据进行自定义操作
	log.Printf("myWrappedStream send a message(%T): %v\n", data, data)
	return s.ServerStream.SendMsg(data)
}

// 实现RecvMsg方法
func (s *myWrappedStream) RecvMsg(data interface{}) error {
	// TODO 就是在这里实现自定义流拦截器的接收逻辑
	// 这里简单打印日志
	err := s.ServerStream.RecvMsg(data) // 从流中接收数据
	if err != nil {
		return err
	}
	// 调用了RecvMsg从流中接收了数据之后,就可通过data访问到接收的数据内容
	// data的类型是在proto文件中定义的流数据的类型
	// 就可以按照自己的需求进一步处理
	log.Printf("myWrappedStream receive a message(%T): %v\n", data, data)

	return nil
}

func newMyServerStream(s grpc.ServerStream) grpc.ServerStream {
	return &myWrappedStream{s}
}

func installServerStreamInterceptor() grpc.StreamServerInterceptor {
	return func(srv interface{},
		ss grpc.ServerStream,
		info *grpc.StreamServerInfo,
		handler grpc.StreamHandler) error {
        
        // ss 的实际类型为 *grpc.serverStream
		log.Printf("actual grpc.ServerStream type: %T\n", ss)
		return handler(srv, newMyServerStream(ss))
	}
}

用法:客户端使用拦截器

在调用grpc.Dial函数的时候,使用grpc.WithUnaryInterceptorgrpc.WithStreamInterceptor指定拦截器。