gorilla/websocket

Websocket ping pong timeout

BigBoulard opened this issue · 3 comments

Hi guys,

I'm trying to build a chat app that looks like the below simplified version.

What the chat app is doing:

  • upgrades the connection to a WS connection
  • sets a read limit
  • sets a read deadline
  • sets a pong handler that will print PONG and then reset the read deadline
  • launches 2 goroutines for reading and writing
    • readMessages: reads any incoming message and close the program on any unexpected error
    • writeMessages:
      • reads the egress channel and writes the received payload back to the web socket
      • start a ticker that send a PING control frame to the WS
const (
	pingInterval = 2 * time.Second // interval at which we send a PING test
	pongWait     = 4 * time.Second // PONG timeout: we must receive PONG
)

type WSClient struct {
	wsconn *websocket.Conn
	egress chan []byte // writes comes to this egress chan cause WS conn are not thread-safe
}

type pongHandler func(string) error

func NewWSClient(c *gin.Context) error {
	// create a WSClient instance
	wsClient := &WSClient{
		egress: make(chan []byte),
	}

	// create Web Socket connection
	conn, err := createWS(c.Writer, c.Request, wsClient.pongHandler)
	if err != nil {
		return fmt.Errorf("app.NewWSClient: createWS error: %w", err)
	}
	wsClient.wsconn = conn

	// launch the read/write goroutines
	go wsClient.readMessages()
	go wsClient.writeMessages()

	return nil
}

// create the WS conn and configure
func createWS(w http.ResponseWriter, r *http.Request, pongHandler pongHandler) (*websocket.Conn, error) {
	conn, err := Upgrade(w, r)
	if err != nil {
		return nil, fmt.Errorf("createWS: upgrader.Upgrade error: %w", err)
	}

	conn.SetReadLimit(conf.MessageMaxSize)

	if err := conn.SetReadDeadline(time.Time{}); err != nil {
		return nil, fmt.Errorf("createWS: conn.SetReadDeadline error: %w", err)
	}

	if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
		return nil, fmt.Errorf("createWS: conn.SetReadDeadline error: %w", err)
	}

	conn.SetPongHandler(pongHandler)

	return conn, nil
}

// readMessages is run as a goroutine
func (c *WSClient) readMessages() {
	for {
		messageType, payload, err := c.wsconn.ReadMessage()
		if err != nil {
			log.Fatalf("wsconn.ReadMessage() error: %v", err)
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { // , websocket.CloseAbnormalClosure) {
				log.Fatalf("websocket.IsUnexpectedCloseError: %v", err)
			}
			c.wsconn.Close()
			return
		}
		log.Printf("received %s:\n%s", c.wsMsgTypeToString(messageType), string(payload))
		// writes the message back to the WS
		c.egress <- payload
	}
}

// write messages to the WS
func (c *WSClient) writeMessages() {
	ticker := time.NewTicker(pingInterval)
	defer func() {
		ticker.Stop()
	}()

	for {
		select {
		case data, ok := <-c.egress:
			if !ok {
				// if channel is closed, send a CLOSE signal to the WS
				if err := c.wsconn.WriteMessage(websocket.CloseMessage, nil); err != nil {
					log.Fatalf("egress channel and WS are closed %v", err)
				}
				c.wsconn.Close()
				return
			}
			// write the message to the connection
			if err := c.wsconn.WriteMessage(websocket.TextMessage, data); err != nil {
				log.Fatalf("WS closed: %v", err)
				c.wsconn.Close()
				return
			}

		case <-ticker.C:
			// Send a PING msg to the WS.
			if err := c.wsconn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
				log.Fatalf("Ping attempt but WS closed: %v", err)
				return
			}
		}
	}
}

// Respond to ping tick and reset the timer
func (c *WSClient) pongHandler(pongMsg string) error {
       log.Print("PONG") // NOT REACHED
	return c.wsconn.SetReadDeadline(time.Now().UTC().Add(pongWait))
}

func (c *WSClient) wsMsgTypeToString(msgType int) string {
	switch msgType {
	case 1:
		return "TEXT"
	case 2:
		return "BINARY"
	case 8:
		return "CLOSE"
	case 9:
		return "PING"
	case 10:
		return "PONG"
	default:
		return "unknown"
	}
}

What the stress-test script is doing

  • creates X rooms of 2 users
  • a WS connection is created for each user
  • a conversation is a sequence like this: user 1 writes, user 1 reads, user 2 writes, user 2 reads ... in an infinite loop
  • a PING handler that is supposed to send a PONG frame to each connection is set
const (
	nbRooms         = 3300 // = 6600 users
	minIdleTimeInMs = 800
	maxIdleTimeInMs = 2000
	throttle        = time.Millisecond * 50
)

func main() {
	configure()
	createChatRooms()
	handleShutdown()
}

func createChatRooms() {
	for i := 0; i < nbRooms; i++ {
		roomID := uuid.New().String()
		user1 := GenUser()
		user2 := GenUser()
		go func(roomID string, u1 User, u2 User) {
			startChat(roomID, u1, u2)
		}(roomID, *user1, *user2)
		time.Sleep(throttle)
	}
}

func startChat(roomID string, u1 User, u2 User) {
	// create users in DB etc.

	// create ws conns
	conn1, err := getWSConn(&u1.User, room.RoomID)
	if err != nil {
		log.Error(err, "getWSConn error")
		return
	}
	conn2, err := getWSConn(&u2.User, room.RoomID)
	if err != nil {
		log.Error(err, "getWSConn error")
		return
	}

	// Start Goroutines for handling messages from user 1 and user 2
	go startConversation(room, u1, conn1, u2, conn2)

	// WARN: connections are never closed intentionally here
}

func startConversation(room *roomdom.Room, u1 User, conn1 *websocket.Conn, u2 User, conn2 *websocket.Conn) {
	defer func() {
		conn1.Close()
		conn2.Close()
	}()
	log.Info(fmt.Sprintf("%s and %s are conversing on %s", u1.User.FirstName, u2.User.FirstName, room.RoomID))
	for {
		// write msg to conn1
		msg1, err := genMsg(u1.User.UID, room.RoomID)
		if err != nil {
			log.Fatal(err, "msg1 genMsg")
			return
		}
		if err := conn1.WriteMessage(websocket.TextMessage, msg1); err != nil {
			log.Fatal(err, "writeMsg1 error")
			return
		}
		time.Sleep(time.Millisecond * time.Duration(genInt(minIdleTimeInMs, maxIdleTimeInMs)))

		// read msg from conn1
		messageType, _, err := conn1.ReadMessage()
		if messageType == websocket.PingMessage || messageType == websocket.PongMessage {
			log.Info("startConversation", "received %s", wsMsgTypeToString(messageType)) // I NEVER receive any PONG message here
		}
		// messageType, payload, err := conn.ReadMessage()
		if err != nil {
			log.Fatal(err, "conn.ReadMessages error")
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
				log.Fatal(err, "UnexpectedCloseError")
			}
		}

		// write msg to conn2
		msg2, err := genMsg(u2.User.UID, room.RoomID)
		if err != nil {
			log.Error(err, "msg1 genMsg")
			return
		}
		if err := conn2.WriteMessage(websocket.TextMessage, msg2); err != nil {
			log.Error(err, "writeMsg2 error")
			return
		}

		// read msg from conn2
		_, _, err = conn2.ReadMessage()
		// messageType, payload, err := conn.ReadMessage()
		if err != nil {
			log.Fatal(err, "conn.ReadMessages error")
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
				log.Fatal(err, "UnexpectedCloseError")
			}
		}

		time.Sleep(time.Millisecond * time.Duration(genInt(minIdleTimeInMs, maxIdleTimeInMs)))
	}
}

func getWSConn(u *userdom.User, roomID string) (*websocket.Conn, error) {
	conn, _, err := websocket.DefaultDialer.Dial("wss://localhost:8090/ws...", nil)
        conn.SetPingHandler(func(data string) error {
		println("RECEIVED PING") // OK
		return conn.WriteMessage(websocket.PongMessage, []byte{})
	})
	return conn, err
}

func wsMsgTypeToString(msgType int) string {
	switch msgType {
	case 1:
		return "TEXT"
	case 2:
		return "BINARY"
	case 8:
		return "CLOSE"
	case 9:
		return "PING"
	case 10:
		return "PONG"
	default:
		return "unknown"
	}
}

func handleShutdown() {
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

	<-quit
	log.Info("Shutting down server gracefully...")
}

Problem

The stress-test script receives a PING frame and tries to send a PONG one but I see no incoming PONG frame in the chat app and the connection is then timed out.

wsconn.ReadMessage() error: read tcp 127.0.0.1:8090->127.0.0.1:55896: i/o timeout

Subsidiary questions

  • What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?
  • I'm using WriteMessage to send control frames. I saw that there's also WriteControl that takes a deadline and then calls SetWriteDeadline. I know that unlike WriteMessage, WriteControl is safe to be used concurrently. The thing is my chat app, I only set a read deadline cause the "idleness" of a connection is detected when the user leaves, so from a server-side perspective, when there's no more data to be read. I don't know how am I supposed to use WriteControl in this context.
  • if I remove the ping-pong part of the algorithm, the chat app can handle around 16350 web sockets, then after I get this error:
    error:websocket: close 1006 (abnormal closure): unexpected EOF.
    What can cause this issue cause I don't have much log?

Originally posted by @BigBoulard in gorilla/.github#26

Control messages like PING are processed when reading a message. Ensure that the client reads all messages.

What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?

NextReader and NextWriter are the core functionality ReadMessage is a helper method for getting a reader using NextReader and reading from that reader to a buffer. WriteMessage is a helper method for getting a writer using NextWriter, writing the message and closing the writer.

I don't know how am I supposed to use WriteControl in this context.

Because your application does not write control messages concurrently with calls to WriteMessage / NextWriter, there's no need to use WriteControl in your application.

Applications should write with a deadline to protect against peers that do not read from the socket.

Control messages like PING are processed when reading a message. Ensure that the client reads all messages.

What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?

NextReader and NextWriter are the core functionality ReadMessage is a helper method for getting a reader using NextReader and reading from that reader to a buffer. WriteMessage is a helper method for getting a writer using NextWriter, writing the message and closing the writer.

I don't know how am I supposed to use WriteControl in this context.

Because your application does not write control messages concurrently with calls to WriteMessage / NextWriter, there's no need to use WriteControl in your application.

Applications should write with a deadline to protect against peers that do not read from the socket.

Hi @pennystevens,

Thanks for getting back to me to provide clarity on this.

  • The main problem I have is if I remove the ping-pong algorithm, the chat app handle around 16 350 web sockets, then I get this error: error:websocket: close 1006 (abnormal closure): unexpected EOF.
    Is there a means so I can get more information on what's causing the issue?

I'm running the app in a docker desktop container as well as the stress-test script that is replicated cause I can see that each instance breaks after connecting around 7500 web sockets (same on localhost), so I launch several instances... I see no resource issue both CPU or Memory. I think I'm far from the number of available sockets (about 64k I think), so I'm searching on some limits maybe to be setup through the Go Compiler: max number of Goroutines etc..I don't know.

  • the second issue is that the server doesn't see any PONG message that should be sent by the stress test script despite the call to conn.WriteMessage(websocket.PongMessage, []byte{}) when the stress test script tries to create a connection ...
func getWSConn(u *userdom.User, roomID string) (*websocket.Conn, error) {
	conn, _, err := websocket.DefaultDialer.Dial("wss://localhost:8090/ws...", nil)
        conn.SetPingHandler(func(data string) error {
		println("RECEIVED PING") // OK
		return conn.WriteMessage(websocket.PongMessage, []byte{})
	})
	return conn, err
}

... and the creation of the PONG handler in the app:

func createWS(w http.ResponseWriter, r *http.Request, pongHandler pongHandler) (*websocket.Conn, error) {
//  ...
   conn.SetPongHandler(pongHandler)
   return conn, nil
}
// ...
// Respond to ping tick and reset the timer
func (c *WSClient) pongHandler(pongMsg string) error {
       log.Print("PONG") // NOT REACHED
	return c.wsconn.SetReadDeadline(time.Now().UTC().Add(pongWait))
}

Is there something I'm missing?