/tcpstream

simple tcp stream lib based golang for building tcp long chain service quickly.

Primary LanguageGo

一个简单的TCP长链的网络库,可以帮助你很简便的开发基于tcp socket连接的业务。同时封装了一个类似于rpc功能的同步调用接口,可以模拟双向RPC调用,服务端主动推送,广播等业务功能。

整个库的核心在于tcpstream,主要支持一下功能:

  • 支持Tcp字节流自动分包功能,使用者感知到的是msg层面的数据,不用关心底层的数据收发;
  • 支持client主动发起的Tcpstream,和Server端Accept派生出来的TcpStream,除了重连功能外,二者其他的调用接口完全相同;SyncClient基于同一条TcpStream实现双向RPC调用;
  • 支持多路复用,多个RPC请求使用一条socket连接即可,内部通信是全异步;在用户端看来SynClient是同步功能;
  • Client的tcpstream默认支持自动重连功能,连接断开后每隔5s向对端发起重连,如果不想重连调用stop接口销毁次tcpstream即可;
  • 不支持默认的心跳包功能,如业务方需要自己在上层实现;

Sync调用功能:

  • client与server建立一个连接后,可通过RequestID的映射功能,实现请求方类似RPC调用功能;
  • 基于一条连接可以实现双向RPC调用的效果;这样就解决了server向client发起rpc调用复杂的问题,client端可以不用再启用rpcServer.
  • 同步调用接口的业务包序列化和反序列化有自己来实现;

同步调用客户端的例子

  • 创建一个客户端
	c := NewSyncClient("127.0.0.1:7001")

基于已经存在tcpstream的创建,client端和server都可以

	c := NewSyncFromTcpStream(tcpstream)
  • 同步调用,输入是要发送的msg,接收到的是对端返回的msg;然后对msg反序列化即可实现自己的业务;
	if resp, err := c.Call(&m, time.Duration(2*time.Second)); err != nil {
		fmt.Println("call fail ", err.Error())
	} else {
		//反序列化resp,do something
	}

一个client和server的例子

server

type Handler struct {
	name string
}

func (h *handler) OnData(conn *TcpStream, msg *Message) error {
	_ = conn.Write(&Message{
		//因为是多路复用的,返回的seq必须要与client请求的seq对应
		Header: ProtoHeader{
			Seq: msg.Header.Seq,
		},
		Body: []byte("I am server"),
	})
	return nil
}
func (h *handler) OnConn(conn *TcpStream)    {}
func (h *handler) OnDisConn(conn *TcpStream) {}

func main() {
	tcpstream.NewTCPServer(address, &Handler{}).Serve()
	select {
	}
}

client:

var address = "127.0.0.1:7001"

type Handler struct{}

func (h *Handler) OnData(conn *tcpstream.TcpStream, msg *tcpstream.Message) error {
	fmt.Println("receive from server: ", string(msg.Body))
	resp := &tcpstream.Message{
		Header: msg.Header,
		Body:   []byte("I am client"),
	}
	time.Sleep(time.Second * 1)
	return conn.Write(resp)
}

func (h *Handler) OnConn(conn *tcpstream.TcpStream) {
	fmt.Println("Connection to ", conn.RemoteAddr)

	_ = conn.Write(&tcpstream.Message{
		Body: []byte("I am client"),
	})
}

func (h *Handler) OnDisConn(conn *tcpstream.TcpStream) {
	fmt.Println("DisConnection to ", conn.RemoteAddr)
}

func main() {
	go func() {
		fmt.Println(http.ListenAndServe("localhost:6060", nil))
	}()

	for i := 0; i < 100; i++ {
		go func() {
			client := tcpstream.NewConnTcpStream(address, &Handler{})
			client.Start()
			time.Sleep(10 * time.Second)
			client.Stop()
		}()
	}
	select {}
}