zhangxiang958/Blog

RPC in Golang(一)

Opened this issue · 0 comments

RPC,即远程过程调用,在 grpc 等协议里面,数据的传输是通过二进制的方式进行传输的。而我们在使用 rpc 框架的时候,为了屏蔽底层的数据传输细节,像通过调用本地代码一样完成远程调用,往往会发现有一个 stub 模块。

stub 模块

什么是 stub 模块?其实就是将客户端的请求参数打包成网络信息,然后通过网络发送给服务方。服务端也有 stub 模块,同样也是封装了将网络消息解包,调用本地方法的过程。

简单的 RPC 框架

服务端

客户端传到服务端的是一个函数名,服务端需要维护一个函数名到函数具体逻辑的映射关系。具体是怎么处理映射关系呢?请看下面代码:

package main

import (
	"fmt"
	"reflect"
)

func CallMeBaby(lastName, firstName string) (string, error) {
	res := fmt.Sprintf("%s%s is my baby!", lastName, firstName)
	return res, nil
}

func main() {
	funcs := make(map[string]reflect.Value)
	funcs["CallMeBaby"] = reflect.ValueOf(CallMeBaby)

	req := []reflect.Value{
		reflect.ValueOf("xiang"),
		reflect.ValueOf("zhang"),
	}

	val := funcs["CallMeBaby"].Call(req)
	var rsp []interface{}
	for _, v := range val {
		rsp = append(rsp, v.Interface())
	}

	fmt.Println("result", rsp)
}

像上面这样的代码,将函数与函数名维护在一个 Map 里面,类似这样,如果客户端传过来函数名字,就能通过 Call 方法来将参数传进函数并执行。

类似这样,我们可以提供一个注册的方式,用服务端编写者能够将方法进行注册,将方法放进映射里面。

import (
	"reflect"
)

type Server struct {
    handlers map[string]reflect.Value
}

func (s *Server) Register(name string, f interface{}) {
    if f, exist := s.handlers[name]; exist {
        return;
    }
    
    s.handlers[name] = reflect.ValueOf(f);
}

这里网络传输是基于 TCP 协议的,我们还需要建立一个 TCP 服务来接收网络信息的传输。

func (s *Server) Serve(addr string) {
    server, err := net.Listen("tcp", addr)
	if err != nil {
		log.Fatal("serve start err", err)
	}
	fmt.Printf("serve listening on %s\n", addr)
	defer server.Close()

	for {
		conn, err := server.Accept()
		if err != nil {
			log.Fatal("serve accep err", err)
			continue
		}
		go func() {
            defer conn.Close()
            buf := make([]byte, 4096)
            for {
                n, err := conn.Read(buf)
                if err != nil || n == 0 {
                    log.Fatal("tcp handler", err)
                    break
                }
                receiveStr := strings.TrimSpace(string(buf[0:n]))
                fmt.Printf("receive: %v", receiveStr)
                conn.Write([]byte(receiveStr))
            }
            fmt.Printf("Connection from %v closed. \n", conn.RemoteAddr())
		}()
	}
}

这里需要使用 while 循环来不停地接收客户端传来的信息,golang 中没有 while 关键字,直接使用 for,省略掉循环条件就可以了。

这里需要使用协程来保证服务端的并发。假设这里只是传一个函数名过来,让服务端执行,并返回函数值,相关代码如下:

go func() {
	defer conn.Close()
	buf := make([]byte, 4096)
	for {
		n, err := conn.Read(buf)
		if err != nil || n == 0 {
			log.Fatal("tcp handler", err)
			break
		}
		receiveStr := strings.TrimSpace(string(buf[0:n]))
		fmt.Printf("receive: %v", receiveStr)
		f, RPCExist := s.handlers[receiveStr]
		if !RPCExist {
			err := fmt.Sprintf("rpc %s is not exist", receiveStr)
			conn.Write([]byte(err))
			continue
		}
		req := []reflect.Value{
			reflect.ValueOf("rpc"),
			reflect.ValueOf("name"),
		}
		result := f.Call(req)
		var rsp []interface{}
		for _, v := range result {
			rsp = append(rsp, v.Interface())
		}
		fmt.Println(rsp)
		sss := rsp[0].(string)
		conn.Write([]byte(sss))
	}
	fmt.Printf("Connection from %v closed. \n", conn.RemoteAddr())
}()

此时,服务端编写者已有简易版的 rpc 框架的写法了:

package main

import (
	"fmt"

	rpc "rpc-lite"
)

func test(n, m string) string {
	res := fmt.Sprintln("testS", n, m)
	return res
}

func main() {
	server := rpc.NewServer()

	server.Register("test", test)

	server.Serve()
}

只需要客户端使用 net 模块的 Dial 方法,建立 socket 连接,并传送数据 "test",就可以执行服务端的代码并拿到对应的返回数据了。