/greatws

100w连接仅需500-700MB内存,针对海量连接特别优化的websocket库(kqueue, epoll),高性能,callback写法,在服务器cpu上有不俗表现 https://github.com/antlabs/greatws-example

Primary LanguageGoApache License 2.0Apache-2.0

greatws

支持海量连接的websocket库,callback写法

Go codecov Go Report Card

处理流程

greatws.png

特性

  • 支持 epoll/kqueue
  • 低内存占用
  • 高tps
  • 对websocket的兼容性较高,完整实现rfc6455, rfc7692

暂不支持

  • ssl
  • windows
  • io-uring

警告⚠️

早期阶段,暂时不建议生产使用

内容

例子-服务端

net http升级到websocket服务端

package main

import (
 "fmt"

 "github.com/antlabs/greatws"
)

type echoHandler struct{}

func (e *echoHandler) OnOpen(c *greatws.Conn) {
 // fmt.Printf("OnOpen: %p\n", c)
}

func (e *echoHandler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
 if err := c.WriteTimeout(op, msg, 3*time.Second); err != nil {
  fmt.Println("write fail:", err)
 }
 // if err := c.WriteMessage(op, msg); err != nil {
 //  slog.Error("write fail:", err)
 // }
}

func (e *echoHandler) OnClose(c *greatws.Conn, err error) {
 errMsg := ""
 if err != nil {
  errMsg = err.Error()
 }
 slog.Error("OnClose:", errMsg)
}

type handler struct {
 m *greatws.MultiEventLoop
}

func (h *handler) echo(w http.ResponseWriter, r *http.Request) {
 c, err := greatws.Upgrade(w, r,
  greatws.WithServerReplyPing(),
  // greatws.WithServerDecompression(),
  greatws.WithServerIgnorePong(),
  greatws.WithServerCallback(&echoHandler{}),
  // greatws.WithServerEnableUTF8Check(),
  greatws.WithServerReadTimeout(5*time.Second),
  greatws.WithServerMultiEventLoop(h.m),
 )
 if err != nil {
  slog.Error("Upgrade fail:", "err", err.Error())
 }
 _ = c
}

func main() {

 var h handler

 h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
 h.m.Start()
 fmt.Printf("apiname:%s\n", h.m.GetApiName())

 mux := &http.ServeMux{}
 mux.HandleFunc("/autobahn", h.echo)

 rawTCP, err := net.Listen("tcp", ":9001")
 if err != nil {
  fmt.Println("Listen fail:", err)
  return
 }
 log.Println("non-tls server exit:", http.Serve(rawTCP, mux))
}

返回

gin升级到websocket服务端

package main

import (
 "fmt"

 "github.com/antlabs/greatws"
 "github.com/gin-gonic/gin"
)

type handler struct{
    m *greatws.MultiEventLoop
}

func (h *handler) OnOpen(c *greatws.Conn) {
 fmt.Printf("服务端收到一个新的连接")
}

func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
 // 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
 // newMsg := make([]byte, len(msg))
 // copy(newMsg, msg)

 fmt.Printf("收到客户端消息:%s\n", msg)
 c.WriteMessage(op, msg)
 // os.Stdout.Write(msg)
}

func (h *handler) OnClose(c *greatws.Conn, err error) {
 fmt.Printf("服务端连接关闭:%v\n", err)
}

func main() {
 r := gin.Default()
 var h handler
 h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
 h.m.Start()

 r.GET("/", func(c *gin.Context) {
  con, err := greatws.Upgrade(c.Writer, c.Request, greatws.WithServerCallback(h.m), greatws.WithServerMultiEventLoop(h.m))
  if err != nil {
   return
  }
  con.StartReadLoop()
 })
 r.Run()
}

返回

客户端

package main

import (
 "fmt"
 "time"

 "github.com/antlabs/greatws"
)

var m *greatws.MultiEventLoop
type handler struct{}

func (h *handler) OnOpen(c *greatws.Conn) {
 fmt.Printf("客户端连接成功\n")
}

func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
 // 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
 // newMsg := make([]byte, len(msg))
 // copy(newMsg, msg)

 fmt.Printf("收到服务端消息:%s\n", msg)
 c.WriteMessage(op, msg)
 time.Sleep(time.Second)
}

func (h *handler) OnClose(c *greatws.Conn, err error) {
 fmt.Printf("客户端端连接关闭:%v\n", err)
}

func main() {
 m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
 m.Start()
 c, err := greatws.Dial("ws://127.0.0.1:8080/", greatws.WithClientCallback(&handler{}), greatws.WithServerMultiEventLoop(h.m))
 if err != nil {
  fmt.Printf("连接失败:%v\n", err)
  return
 }

 c.WriteMessage(opcode.Text, []byte("hello"))
 time.Sleep(time.Hour) //demo里面等待下OnMessage 看下执行效果,因为greatws.Dial和WriteMessage都是非阻塞的函数调用,不会卡住主go程
}

返回

配置函数

客户端配置参数

配置header

func main() {
 greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientHTTPHeader(http.Header{
  "h1": "v1",
  "h2":"v2", 
 }))
}

返回

配置握手时的超时时间

func main() {
 greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientDialTimeout(2 * time.Second))
}

返回

配置自动回复ping消息

func main() {
 greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReplyPing())
}

返回

配置客户端最大读message

 // 限制客户端最大服务返回返回的最大包是1024,如果超过这个大小报错
 greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReadMaxMessage(1024))

返回

配置客户端压缩和解压消息

func main() {
 greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientDecompressAndCompress())
}

返回

配置客户端上下文接管

func main() {
 greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientContextTakeover())
}

返回

服务端配置参数

配置服务自动回复ping消息

func main() {
 c, err := greatws.Upgrade(w, r, greatws.WithServerReplyPing())
        if err != nil {
                fmt.Println("Upgrade fail:", err)
                return
        }   
}

返回

配置服务端最大读message

func main() {
 // 配置服务端读取客户端最大的包是1024大小, 超过该值报错
 c, err := greatws.Upgrade(w, r, greatws.WithServerReadMaxMessage(1024))
        if err != nil {
                fmt.Println("Upgrade fail:", err)
                return
        }   
}

返回

配置服务端解压消息

func main() {
 // 配置服务端读取客户端最大的包是1024大小, 超过该值报错
 c, err := greatws.Upgrade(w, r, greatws.WithServerDecompression())
        if err != nil {
                fmt.Println("Upgrade fail:", err)
                return
        }   
}

返回

配置服务端压缩和解压消息

func main() {
 c, err := greatws.Upgrade(w, r, greatws.WithServerDecompressAndCompress())
        if err != nil {
                fmt.Println("Upgrade fail:", err)
                return
        }   
}

返回

配置服务端上下文接管

func main() {
 // 配置服务端读取客户端最大的包是1024大小, 超过该值报错
 c, err := greatws.Upgrade(w, r, greatws.WithServerContextTakeover)
        if err != nil {
                fmt.Println("Upgrade fail:", err)
                return
        }   
}

返回

100w websocket长链接测试

e5 洋垃圾机器

  • cpu=e5 2686(单路)
  • memory=32GB
BenchType : BenchEcho
Framework : greatws
TPS : 106014
EER : 218.54
Min : 49.26us
Avg : 94.08ms
Max : 954.33ms
TP50 : 45.76ms
TP75 : 52.27ms
TP90 : 336.85ms
TP95 : 427.07ms
TP99 : 498.66ms
Used : 18.87s
Total : 2000000
Success : 2000000
Failed : 0
Conns : 1000000
Concurrency: 10000
Payload : 1024
CPU Min : 184.90%
CPU Avg : 485.10%
CPU Max : 588.31%
MEM Min : 563.40M
MEM Avg : 572.40M
MEM Max : 594.48M

5800h cpu

  • cpu=5800h
  • memory=64GB
BenchType  : BenchEcho
Framework  : greatws
TPS        : 103544
EER        : 397.07
Min        : 26.51us
Avg        : 95.79ms
Max        : 1.34s
TP50       : 58.26ms
TP75       : 60.94ms
TP90       : 62.50ms
TP95       : 63.04ms
TP99       : 63.47ms
Used       : 40.76s
Total      : 5000000
Success    : 4220634
Failed     : 779366
Conns      : 1000000
Concurrency: 10000
Payload    : 1024
CPU Min    : 30.54%
CPU Avg    : 260.77%
CPU Max    : 335.88%
MEM Min    : 432.25M
MEM Avg    : 439.71M
MEM Max    : 449.62M