latency seems worse than uds in low qps and batch IO scenario
Closed this issue · 2 comments
sirzelaen commented
It seems like shm ipc latency is worse than when using plain uds sockets, below is the difference i'm measuring between the two:
shm ipc:
server
package main
import (
"fmt"
"github.com/cloudwego/shmipc-go"
"github.com/cloudwego/shmipc-go/example/best_practice/idl"
"net"
"os"
"path/filepath"
"syscall"
"time"
)
func handleStream(s *shmipc.Stream) {
req := &idl.Request{}
resp := &idl.Response{}
for {
// 1. deserialize Request
if err := req.ReadFromShm(s.BufferReader()); err != nil {
fmt.Println("stream read request, err=" + err.Error())
return
}
// 3.serialize Response
resp.ID = req.ID
// Measure latency and send a response
latency := time.Since(time.Unix(0, int64(req.ID)))
fmt.Printf("Received latency request: %v\n", latency)
if err := resp.WriteToShm(s.BufferWriter()); err != nil {
fmt.Println("stream write response failed, err=" + err.Error())
return
}
if err := s.Flush(false); err != nil {
fmt.Println("stream write response failed, err=" + err.Error())
return
}
req.Reset()
resp.Reset()
}
}
func init() {
// runtime.GOMAXPROCS(1)
}
func main() {
dir, err := os.Getwd()
if err != nil {
panic(err)
}
udsPath := filepath.Join(dir, "../ipc_test.sock")
// 1. listen unix domain socket
_ = syscall.Unlink(udsPath)
ln, err := net.ListenUnix("unix", &net.UnixAddr{Name: udsPath, Net: "unix"})
if err != nil {
panic(err)
}
defer ln.Close()
// 2. accept a unix domain socket
for {
conn, err := ln.Accept()
if err != nil {
fmt.Printf("accept error:%s now exit", err.Error())
return
}
go func() {
defer conn.Close()
// 3. create server session
conf := shmipc.DefaultConfig()
server, err := shmipc.Server(conn, conf)
if err != nil {
panic("new ipc server failed " + err.Error())
}
defer server.Close()
// 4. accept stream and handle
for {
stream, err := server.AcceptStream()
if err != nil {
fmt.Println("shmipc server accept stream failed, err=" + err.Error())
break
}
go handleStream(stream)
}
}()
}
}
client:
package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"time"
"github.com/cloudwego/shmipc-go"
"github.com/cloudwego/shmipc-go/example/best_practice/idl"
)
func init() {
// runtime.GOMAXPROCS(1)
}
func main() {
// packageSize := flag.Int("p", 1024, "-p 1024 mean that request and response's size are both near 1KB")
flag.Parse()
// 1. get current directory
dir, err := os.Getwd()
if err != nil {
panic(err)
}
// 2. init session manager
conf := shmipc.DefaultSessionManagerConfig()
conf.Address = filepath.Join(dir, "../ipc_test.sock")
conf.Network = "unix"
conf.MemMapType = shmipc.MemMapTypeMemFd
conf.SessionNum = 1
conf.InitializeTimeout = 100 * time.Second
smgr, err := shmipc.NewSessionManager(conf)
if err != nil {
panic(err)
}
go func() {
req := &idl.Request{}
resp := &idl.Response{}
for range time.Tick(time.Second) {
// 3. get stream from SessionManager
stream, err := smgr.GetStream()
if err != nil {
fmt.Println("get stream error:" + err.Error())
continue
}
// 4. set request object
req.Reset()
req.ID = uint64(time.Now().UnixNano())
// 5. write req to buffer of stream
if err := req.WriteToShm(stream.BufferWriter()); err != nil {
fmt.Println("write request to share memory failed, err=" + err.Error())
return
}
// 6. flush the buffered data of stream to peer
if err := stream.Flush(false); err != nil {
fmt.Println(" flush request to peer failed, err=" + err.Error())
return
}
// 7. wait and read response
resp.Reset()
if err := resp.ReadFromShm(stream.BufferReader()); err != nil {
fmt.Println("write request to share memory failed, err=" + err.Error())
continue
}
}
}()
time.Sleep(1200 * time.Second)
}
output
Received latency request: 741.428µs
Received latency request: 252.1µs
Received latency request: 178.321µs
Received latency request: 287.057µs
Received latency request: 225.539µs
Received latency request: 104.035µs
Received latency request: 189.923µs
Received latency request: 266.65µs
Received latency request: 195.734µs
Received latency request: 198.661µs
uds:
server:
package main
import (
"encoding/binary"
"fmt"
"net"
"os"
"time"
)
func handleConn(conn net.Conn) {
defer conn.Close()
for {
// Read data from the client
data := make([]byte, 1024)
_, err := conn.Read(data)
if err != nil {
fmt.Printf("Error reading data: %v\n", err)
return
}
// Measure latency and send a response
latency := time.Since(time.Unix(0, int64(binary.BigEndian.Uint64(data))))
fmt.Printf("Received latency request: %v\n", latency)
}
}
func main() {
// Remove the socket file if it already exists
socketFile := "/tmp/socket"
if err := os.Remove(socketFile); err != nil && !os.IsNotExist(err) {
fmt.Printf("Error removing socket file: %v\n", err)
return
}
// Create a UNIX domain socket
listener, err := net.Listen("unix", socketFile)
if err != nil {
fmt.Printf("Error creating socket: %v\n", err)
return
}
defer listener.Close()
fmt.Println("Server listening on", socketFile)
for {
// Accept incoming connections
conn, err := listener.Accept()
if err != nil {
fmt.Printf("Error accepting connection: %v\n", err)
return
}
go handleConn(conn)
}
}
client:
package main
import (
"encoding/binary"
"fmt"
"net"
"time"
)
func main() {
// Connect to the server's UNIX domain socket
conn, err := net.Dial("unix", "/tmp/socket")
if err != nil {
fmt.Printf("Error connecting to server: %v\n", err)
return
}
defer conn.Close()
for range time.Tick(time.Second) {
// Send the current time to the server for measuring latency
currentTime := time.Now().UnixNano()
currentTimeBytes := make([]byte, 8)
binary.BigEndian.PutUint64(currentTimeBytes, uint64(currentTime))
_, err = conn.Write(currentTimeBytes)
if err != nil {
fmt.Printf("Error sending data: %v\n", err)
return
}
}
}
latency:
Received latency request: 209.581µs
Received latency request: 111.524µs
Received latency request: 119.614µs
Received latency request: 80.55µs
Received latency request: 133.225µs
Received latency request: 92.258µs
Received latency request: 97.206µs
Received latency request: 89.792µs
Received latency request: 106.665µs
Received latency request: 83.404µs
zhquzzuli commented
Your test scene is per second send one request, shmipc cannot take advantage of batch IO. Shmipc can exert high performance in high qps or large-package scenarios .
GuangmingLuo commented
Question answered. If you find a business scenario to use shmipc, please let us know. Thanks.