Websocket ping pong timeout
BigBoulard opened this issue · 3 comments
Hi guys,
I'm trying to build a chat app that looks like the below simplified version.
What the chat app is doing:
- upgrades the connection to a WS connection
- sets a read limit
- sets a read deadline
- sets a pong handler that will print PONG and then reset the read deadline
- launches 2 goroutines for reading and writing
- readMessages: reads any incoming message and close the program on any unexpected error
- writeMessages:
- reads the egress channel and writes the received payload back to the web socket
- start a ticker that send a PING control frame to the WS
const (
pingInterval = 2 * time.Second // interval at which we send a PING test
pongWait = 4 * time.Second // PONG timeout: we must receive PONG
)
type WSClient struct {
wsconn *websocket.Conn
egress chan []byte // writes comes to this egress chan cause WS conn are not thread-safe
}
type pongHandler func(string) error
func NewWSClient(c *gin.Context) error {
// create a WSClient instance
wsClient := &WSClient{
egress: make(chan []byte),
}
// create Web Socket connection
conn, err := createWS(c.Writer, c.Request, wsClient.pongHandler)
if err != nil {
return fmt.Errorf("app.NewWSClient: createWS error: %w", err)
}
wsClient.wsconn = conn
// launch the read/write goroutines
go wsClient.readMessages()
go wsClient.writeMessages()
return nil
}
// create the WS conn and configure
func createWS(w http.ResponseWriter, r *http.Request, pongHandler pongHandler) (*websocket.Conn, error) {
conn, err := Upgrade(w, r)
if err != nil {
return nil, fmt.Errorf("createWS: upgrader.Upgrade error: %w", err)
}
conn.SetReadLimit(conf.MessageMaxSize)
if err := conn.SetReadDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("createWS: conn.SetReadDeadline error: %w", err)
}
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
return nil, fmt.Errorf("createWS: conn.SetReadDeadline error: %w", err)
}
conn.SetPongHandler(pongHandler)
return conn, nil
}
// readMessages is run as a goroutine
func (c *WSClient) readMessages() {
for {
messageType, payload, err := c.wsconn.ReadMessage()
if err != nil {
log.Fatalf("wsconn.ReadMessage() error: %v", err)
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { // , websocket.CloseAbnormalClosure) {
log.Fatalf("websocket.IsUnexpectedCloseError: %v", err)
}
c.wsconn.Close()
return
}
log.Printf("received %s:\n%s", c.wsMsgTypeToString(messageType), string(payload))
// writes the message back to the WS
c.egress <- payload
}
}
// write messages to the WS
func (c *WSClient) writeMessages() {
ticker := time.NewTicker(pingInterval)
defer func() {
ticker.Stop()
}()
for {
select {
case data, ok := <-c.egress:
if !ok {
// if channel is closed, send a CLOSE signal to the WS
if err := c.wsconn.WriteMessage(websocket.CloseMessage, nil); err != nil {
log.Fatalf("egress channel and WS are closed %v", err)
}
c.wsconn.Close()
return
}
// write the message to the connection
if err := c.wsconn.WriteMessage(websocket.TextMessage, data); err != nil {
log.Fatalf("WS closed: %v", err)
c.wsconn.Close()
return
}
case <-ticker.C:
// Send a PING msg to the WS.
if err := c.wsconn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Fatalf("Ping attempt but WS closed: %v", err)
return
}
}
}
}
// Respond to ping tick and reset the timer
func (c *WSClient) pongHandler(pongMsg string) error {
log.Print("PONG") // NOT REACHED
return c.wsconn.SetReadDeadline(time.Now().UTC().Add(pongWait))
}
func (c *WSClient) wsMsgTypeToString(msgType int) string {
switch msgType {
case 1:
return "TEXT"
case 2:
return "BINARY"
case 8:
return "CLOSE"
case 9:
return "PING"
case 10:
return "PONG"
default:
return "unknown"
}
}
What the stress-test script is doing
- creates X rooms of 2 users
- a WS connection is created for each user
- a conversation is a sequence like this: user 1 writes, user 1 reads, user 2 writes, user 2 reads ... in an infinite loop
- a PING handler that is supposed to send a PONG frame to each connection is set
const (
nbRooms = 3300 // = 6600 users
minIdleTimeInMs = 800
maxIdleTimeInMs = 2000
throttle = time.Millisecond * 50
)
func main() {
configure()
createChatRooms()
handleShutdown()
}
func createChatRooms() {
for i := 0; i < nbRooms; i++ {
roomID := uuid.New().String()
user1 := GenUser()
user2 := GenUser()
go func(roomID string, u1 User, u2 User) {
startChat(roomID, u1, u2)
}(roomID, *user1, *user2)
time.Sleep(throttle)
}
}
func startChat(roomID string, u1 User, u2 User) {
// create users in DB etc.
// create ws conns
conn1, err := getWSConn(&u1.User, room.RoomID)
if err != nil {
log.Error(err, "getWSConn error")
return
}
conn2, err := getWSConn(&u2.User, room.RoomID)
if err != nil {
log.Error(err, "getWSConn error")
return
}
// Start Goroutines for handling messages from user 1 and user 2
go startConversation(room, u1, conn1, u2, conn2)
// WARN: connections are never closed intentionally here
}
func startConversation(room *roomdom.Room, u1 User, conn1 *websocket.Conn, u2 User, conn2 *websocket.Conn) {
defer func() {
conn1.Close()
conn2.Close()
}()
log.Info(fmt.Sprintf("%s and %s are conversing on %s", u1.User.FirstName, u2.User.FirstName, room.RoomID))
for {
// write msg to conn1
msg1, err := genMsg(u1.User.UID, room.RoomID)
if err != nil {
log.Fatal(err, "msg1 genMsg")
return
}
if err := conn1.WriteMessage(websocket.TextMessage, msg1); err != nil {
log.Fatal(err, "writeMsg1 error")
return
}
time.Sleep(time.Millisecond * time.Duration(genInt(minIdleTimeInMs, maxIdleTimeInMs)))
// read msg from conn1
messageType, _, err := conn1.ReadMessage()
if messageType == websocket.PingMessage || messageType == websocket.PongMessage {
log.Info("startConversation", "received %s", wsMsgTypeToString(messageType)) // I NEVER receive any PONG message here
}
// messageType, payload, err := conn.ReadMessage()
if err != nil {
log.Fatal(err, "conn.ReadMessages error")
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.Fatal(err, "UnexpectedCloseError")
}
}
// write msg to conn2
msg2, err := genMsg(u2.User.UID, room.RoomID)
if err != nil {
log.Error(err, "msg1 genMsg")
return
}
if err := conn2.WriteMessage(websocket.TextMessage, msg2); err != nil {
log.Error(err, "writeMsg2 error")
return
}
// read msg from conn2
_, _, err = conn2.ReadMessage()
// messageType, payload, err := conn.ReadMessage()
if err != nil {
log.Fatal(err, "conn.ReadMessages error")
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.Fatal(err, "UnexpectedCloseError")
}
}
time.Sleep(time.Millisecond * time.Duration(genInt(minIdleTimeInMs, maxIdleTimeInMs)))
}
}
func getWSConn(u *userdom.User, roomID string) (*websocket.Conn, error) {
conn, _, err := websocket.DefaultDialer.Dial("wss://localhost:8090/ws...", nil)
conn.SetPingHandler(func(data string) error {
println("RECEIVED PING") // OK
return conn.WriteMessage(websocket.PongMessage, []byte{})
})
return conn, err
}
func wsMsgTypeToString(msgType int) string {
switch msgType {
case 1:
return "TEXT"
case 2:
return "BINARY"
case 8:
return "CLOSE"
case 9:
return "PING"
case 10:
return "PONG"
default:
return "unknown"
}
}
func handleShutdown() {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Info("Shutting down server gracefully...")
}
Problem
The stress-test script receives a PING frame and tries to send a PONG one but I see no incoming PONG frame in the chat app and the connection is then timed out.
wsconn.ReadMessage() error: read tcp 127.0.0.1:8090->127.0.0.1:55896: i/o timeout
Subsidiary questions
- What's the difference/benefit of using
ReadMessage/WriteMessage
vsNextReader/NextWriter
? - I'm using
WriteMessage
to send control frames. I saw that there's alsoWriteControl
that takes a deadline and then callsSetWriteDeadline
. I know that unlikeWriteMessage
,WriteControl
is safe to be used concurrently. The thing is my chat app, I only set a read deadline cause the "idleness" of a connection is detected when the user leaves, so from a server-side perspective, when there's no more data to be read. I don't know how am I supposed to useWriteControl
in this context. - if I remove the ping-pong part of the algorithm, the chat app can handle around 16350 web sockets, then after I get this error:
error:websocket: close 1006 (abnormal closure): unexpected EOF
.
What can cause this issue cause I don't have much log?
Originally posted by @BigBoulard in gorilla/.github#26
Control messages like PING are processed when reading a message. Ensure that the client reads all messages.
What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?
NextReader and NextWriter are the core functionality ReadMessage is a helper method for getting a reader using NextReader and reading from that reader to a buffer. WriteMessage is a helper method for getting a writer using NextWriter, writing the message and closing the writer.
I don't know how am I supposed to use WriteControl in this context.
Because your application does not write control messages concurrently with calls to WriteMessage / NextWriter, there's no need to use WriteControl in your application.
Applications should write with a deadline to protect against peers that do not read from the socket.
Control messages like PING are processed when reading a message. Ensure that the client reads all messages.
What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?
NextReader and NextWriter are the core functionality ReadMessage is a helper method for getting a reader using NextReader and reading from that reader to a buffer. WriteMessage is a helper method for getting a writer using NextWriter, writing the message and closing the writer.
I don't know how am I supposed to use WriteControl in this context.
Because your application does not write control messages concurrently with calls to WriteMessage / NextWriter, there's no need to use WriteControl in your application.
Applications should write with a deadline to protect against peers that do not read from the socket.
Hi @pennystevens,
Thanks for getting back to me to provide clarity on this.
- The main problem I have is if I remove the ping-pong algorithm, the chat app handle around 16 350 web sockets, then I get this error:
error:websocket: close 1006 (abnormal closure): unexpected EOF.
Is there a means so I can get more information on what's causing the issue?
I'm running the app in a docker desktop
container as well as the stress-test script that is replicated cause I can see that each instance breaks after connecting around 7500 web sockets (same on localhost), so I launch several instances... I see no resource issue both CPU or Memory. I think I'm far from the number of available sockets (about 64k I think), so I'm searching on some limits maybe to be setup through the Go Compiler: max number of Goroutines etc..I don't know.
- the second issue is that the server doesn't see any PONG message that should be sent by the stress test script despite the call to
conn.WriteMessage(websocket.PongMessage, []byte{})
when the stress test script tries to create a connection ...
func getWSConn(u *userdom.User, roomID string) (*websocket.Conn, error) {
conn, _, err := websocket.DefaultDialer.Dial("wss://localhost:8090/ws...", nil)
conn.SetPingHandler(func(data string) error {
println("RECEIVED PING") // OK
return conn.WriteMessage(websocket.PongMessage, []byte{})
})
return conn, err
}
... and the creation of the PONG handler in the app:
func createWS(w http.ResponseWriter, r *http.Request, pongHandler pongHandler) (*websocket.Conn, error) {
// ...
conn.SetPongHandler(pongHandler)
return conn, nil
}
// ...
// Respond to ping tick and reset the timer
func (c *WSClient) pongHandler(pongMsg string) error {
log.Print("PONG") // NOT REACHED
return c.wsconn.SetReadDeadline(time.Now().UTC().Add(pongWait))
}
Is there something I'm missing?