snapview/tungstenite-rs

How to do non-blocking reads on client?

nastynaz opened this issue · 5 comments

How can I correctly perform non-blocking reads? To be clear, I want the CPU to spin at 100%.
I have n websocket client connections and I want to loop over all of them reading messages where possible.

This is the code I have but it fails with HandshakeError::Interrupted(...):

for i in 1..=n {
    std::thread::sleep(Duration::from_millis(10));
    let listener = TcpStream::connect(SERVER_ADDRESS).expect("failed to connect to server");
    listener
        .set_nonblocking(true)
        .expect("couldnt set nonblocking");
    let (socket, _) =
        client(&server_url, listener).expect(&format!("client {i} couldn't connect"));
    sockets.push(socket);
}

'outer: loop {
    // spin
    for socket in &mut sockets {
        let result = socket.read();
        match result {
            Ok(msg) => {
                // process
            }
            Err(err) => match err {
                Error::Io(err) => match err.kind() {
                    std::io::ErrorKind::WouldBlock => continue,
                    _ => panic!("unexpected io error: {:?}", err),
                },
                Error::ConnectionClosed | Error::AlreadyClosed => {
                    tracing::info!("at least once client disconnected, stopping.");
                    break 'outer;
                }
                _ => panic!("unexpected websocket error: {:?}", err),
            },
        }
    }
}

The client() function is already non-blocking, too, since you switched to non-blocking mode just before calling client(). You should deal with it in exactly the same way, continuing after Interrupted.

HandshakeError::Interrupted has the same meaning as WouldBlock except that contains the necessary data to continue. Take MidHandshake out of it and call its handshake() method. Repeat as needed.

@agalakhov I appreciate the swift response! With your suggestion I got it working on the server and client.

However the server isn't pushing messages as fast as I like. I'm not entirely sure what I'm doing is correct. How can I speed it up?

// server
let listener = TcpListener::bind(SERVER_ADDRESS).expect("failed to bind to address");
listener.set_nonblocking(true).expect("couldnt set nonblocking on listener");

while let Ok(stream) = accept_listener(&listener) {
    stream.set_nodelay(true).expect("couldnt set nodelay on stream");
    stream
        .set_nonblocking(true)
        .expect("couldnt set stream nonblocking on stream");

    let peer = stream.peer_addr().expect("failed to get peer address");
    let ws_conn = create_ws_stream(peer, stream);
    // send for processing
}

fn create_ws_stream(peer: SocketAddr, stream: TcpStream) -> WebSocket<TcpStream> {
    let mut config = WebSocketConfig::default();
    config.write_buffer_size = 64;

    match accept_with_config(stream, Some(config)) {
        Ok(ws) => ws,
        Err(e) => match e {
            HandshakeError::Interrupted(s) => server_handshaker(s), // loops to complete handshake
            HandshakeError::Failure(err) => panic!("server-side handshake failed"),
        },
    }
}

fn accept_listener(listener: &TcpListener) -> io::Result<TcpStream> {
    match listener.accept() {
        Ok((stream, _)) => Ok(stream),
        Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
            std::thread::sleep(std::time::Duration::from_millis(1));
            accept_listener(listener)
        }
        Err(e) => Err(e),
    }
}

Also, once I have it all solved would you be open to a PR with an additional example showing how to set up non-blocking streams on the client & server?

Edit: nevermind, I forgot to test it in release mode 🤦

I'm happy to create a PR for an additional example if you think it'd be helpful.

That would be nice. But please add some kind of select() (Mio-based waiting?) to it, since there are many users that aren't understanding the difference between blocking and 100% CPU busy-waiting, and there is a huge risk of getting complaints about "100% CPU load in the example".

I feel like this one has been answered well, so it seems like we can close it!