cloudwego/shmipc-go

latency seems worse than uds in low qps and batch IO scenario

Closed this issue · 2 comments

It seems like shm ipc latency is worse than when using plain uds sockets, below is the difference i'm measuring between the two:

shm ipc:

server

package main

import (
	"fmt"
	"github.com/cloudwego/shmipc-go"
	"github.com/cloudwego/shmipc-go/example/best_practice/idl"
	"net"
	"os"
	"path/filepath"
	"syscall"
	"time"
)

func handleStream(s *shmipc.Stream) {
	req := &idl.Request{}
	resp := &idl.Response{}
	for {
		// 1. deserialize Request
		if err := req.ReadFromShm(s.BufferReader()); err != nil {
			fmt.Println("stream read request, err=" + err.Error())
			return
		}

		// 3.serialize Response
		resp.ID = req.ID

		// Measure latency and send a response
		latency := time.Since(time.Unix(0, int64(req.ID)))
		fmt.Printf("Received latency request: %v\n", latency)

		if err := resp.WriteToShm(s.BufferWriter()); err != nil {
			fmt.Println("stream write response failed, err=" + err.Error())
			return
		}
		if err := s.Flush(false); err != nil {
			fmt.Println("stream write response failed, err=" + err.Error())
			return
		}
		req.Reset()
		resp.Reset()
	}
}

func init() {
	// runtime.GOMAXPROCS(1)
}

func main() {
	dir, err := os.Getwd()
	if err != nil {
		panic(err)
	}
	udsPath := filepath.Join(dir, "../ipc_test.sock")

	// 1. listen unix domain socket
	_ = syscall.Unlink(udsPath)
	ln, err := net.ListenUnix("unix", &net.UnixAddr{Name: udsPath, Net: "unix"})
	if err != nil {
		panic(err)
	}
	defer ln.Close()

	// 2. accept a unix domain socket
	for {
		conn, err := ln.Accept()
		if err != nil {
			fmt.Printf("accept error:%s now exit", err.Error())
			return
		}
		go func() {
			defer conn.Close()

			// 3. create server session
			conf := shmipc.DefaultConfig()
			server, err := shmipc.Server(conn, conf)
			if err != nil {
				panic("new ipc server failed " + err.Error())
			}
			defer server.Close()

			// 4. accept stream and handle
			for {
				stream, err := server.AcceptStream()
				if err != nil {
					fmt.Println("shmipc server accept stream failed, err=" + err.Error())
					break
				}
				go handleStream(stream)
			}
		}()
	}
}

client:

package main

import (
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"time"

	"github.com/cloudwego/shmipc-go"
	"github.com/cloudwego/shmipc-go/example/best_practice/idl"
)

func init() {
	// runtime.GOMAXPROCS(1)
}

func main() {
	// packageSize := flag.Int("p", 1024, "-p 1024 mean that request and response's size are both near 1KB")
	flag.Parse()

	// 1. get current directory
	dir, err := os.Getwd()
	if err != nil {
		panic(err)
	}

	// 2. init session manager
	conf := shmipc.DefaultSessionManagerConfig()
	conf.Address = filepath.Join(dir, "../ipc_test.sock")
	conf.Network = "unix"
	conf.MemMapType = shmipc.MemMapTypeMemFd
	conf.SessionNum = 1
	conf.InitializeTimeout = 100 * time.Second
	smgr, err := shmipc.NewSessionManager(conf)
	if err != nil {
		panic(err)
	}

	go func() {
		req := &idl.Request{}
		resp := &idl.Response{}

		for range time.Tick(time.Second) {
			// 3. get stream from SessionManager
			stream, err := smgr.GetStream()
			if err != nil {
				fmt.Println("get stream error:" + err.Error())
				continue
			}

			// 4. set request object
			req.Reset()
			req.ID = uint64(time.Now().UnixNano())

			// 5. write req to buffer of stream
			if err := req.WriteToShm(stream.BufferWriter()); err != nil {
				fmt.Println("write request to share memory failed, err=" + err.Error())
				return
			}

			// 6. flush the buffered data of stream to peer
			if err := stream.Flush(false); err != nil {
				fmt.Println(" flush request to peer failed, err=" + err.Error())
				return
			}

			// 7. wait and read response
			resp.Reset()
			if err := resp.ReadFromShm(stream.BufferReader()); err != nil {
				fmt.Println("write request to share memory failed, err=" + err.Error())
				continue
			}
		}
	}()
	time.Sleep(1200 * time.Second)
}

output

Received latency request: 741.428µs
Received latency request: 252.1µs
Received latency request: 178.321µs
Received latency request: 287.057µs
Received latency request: 225.539µs
Received latency request: 104.035µs
Received latency request: 189.923µs
Received latency request: 266.65µs
Received latency request: 195.734µs
Received latency request: 198.661µs

uds:

server:

package main

import (
	"encoding/binary"
	"fmt"
	"net"
	"os"
	"time"
)

func handleConn(conn net.Conn) {
	defer conn.Close()

	for {
		// Read data from the client
		data := make([]byte, 1024)
		_, err := conn.Read(data)
		if err != nil {
			fmt.Printf("Error reading data: %v\n", err)
			return
		}

		// Measure latency and send a response
		latency := time.Since(time.Unix(0, int64(binary.BigEndian.Uint64(data))))
		fmt.Printf("Received latency request: %v\n", latency)
	}
}

func main() {
	// Remove the socket file if it already exists
	socketFile := "/tmp/socket"
	if err := os.Remove(socketFile); err != nil && !os.IsNotExist(err) {
		fmt.Printf("Error removing socket file: %v\n", err)
		return
	}

	// Create a UNIX domain socket
	listener, err := net.Listen("unix", socketFile)
	if err != nil {
		fmt.Printf("Error creating socket: %v\n", err)
		return
	}
	defer listener.Close()

	fmt.Println("Server listening on", socketFile)

	for {
		// Accept incoming connections
		conn, err := listener.Accept()
		if err != nil {
			fmt.Printf("Error accepting connection: %v\n", err)
			return
		}

		go handleConn(conn)
	}
}

client:

package main

import (
	"encoding/binary"
	"fmt"
	"net"
	"time"
)

func main() {
	// Connect to the server's UNIX domain socket
	conn, err := net.Dial("unix", "/tmp/socket")
	if err != nil {
		fmt.Printf("Error connecting to server: %v\n", err)
		return
	}
	defer conn.Close()

	for range time.Tick(time.Second) {
		// Send the current time to the server for measuring latency
		currentTime := time.Now().UnixNano()
		currentTimeBytes := make([]byte, 8)
		binary.BigEndian.PutUint64(currentTimeBytes, uint64(currentTime))
		_, err = conn.Write(currentTimeBytes)
		if err != nil {
			fmt.Printf("Error sending data: %v\n", err)
			return
		}
	}
}

latency:

Received latency request: 209.581µs
Received latency request: 111.524µs
Received latency request: 119.614µs
Received latency request: 80.55µs
Received latency request: 133.225µs
Received latency request: 92.258µs
Received latency request: 97.206µs
Received latency request: 89.792µs
Received latency request: 106.665µs
Received latency request: 83.404µs

Your test scene is per second send one request, shmipc cannot take advantage of batch IO. Shmipc can exert high performance in high qps or large-package scenarios .

Question answered. If you find a business scenario to use shmipc, please let us know. Thanks.