How to do non-blocking reads on client?
nastynaz opened this issue · 5 comments
How can I correctly perform non-blocking reads? To be clear, I want the CPU to spin at 100%.
I have n
websocket client connections and I want to loop over all of them reading messages where possible.
This is the code I have but it fails with HandshakeError::Interrupted(...)
:
for i in 1..=n {
std::thread::sleep(Duration::from_millis(10));
let listener = TcpStream::connect(SERVER_ADDRESS).expect("failed to connect to server");
listener
.set_nonblocking(true)
.expect("couldnt set nonblocking");
let (socket, _) =
client(&server_url, listener).expect(&format!("client {i} couldn't connect"));
sockets.push(socket);
}
'outer: loop {
// spin
for socket in &mut sockets {
let result = socket.read();
match result {
Ok(msg) => {
// process
}
Err(err) => match err {
Error::Io(err) => match err.kind() {
std::io::ErrorKind::WouldBlock => continue,
_ => panic!("unexpected io error: {:?}", err),
},
Error::ConnectionClosed | Error::AlreadyClosed => {
tracing::info!("at least once client disconnected, stopping.");
break 'outer;
}
_ => panic!("unexpected websocket error: {:?}", err),
},
}
}
}
The client()
function is already non-blocking, too, since you switched to non-blocking mode just before calling client()
. You should deal with it in exactly the same way, continuing after Interrupted
.
HandshakeError::Interrupted
has the same meaning as WouldBlock
except that contains the necessary data to continue. Take MidHandshake
out of it and call its handshake()
method. Repeat as needed.
@agalakhov I appreciate the swift response! With your suggestion I got it working on the server and client.
However the server isn't pushing messages as fast as I like. I'm not entirely sure what I'm doing is correct. How can I speed it up?
// server
let listener = TcpListener::bind(SERVER_ADDRESS).expect("failed to bind to address");
listener.set_nonblocking(true).expect("couldnt set nonblocking on listener");
while let Ok(stream) = accept_listener(&listener) {
stream.set_nodelay(true).expect("couldnt set nodelay on stream");
stream
.set_nonblocking(true)
.expect("couldnt set stream nonblocking on stream");
let peer = stream.peer_addr().expect("failed to get peer address");
let ws_conn = create_ws_stream(peer, stream);
// send for processing
}
fn create_ws_stream(peer: SocketAddr, stream: TcpStream) -> WebSocket<TcpStream> {
let mut config = WebSocketConfig::default();
config.write_buffer_size = 64;
match accept_with_config(stream, Some(config)) {
Ok(ws) => ws,
Err(e) => match e {
HandshakeError::Interrupted(s) => server_handshaker(s), // loops to complete handshake
HandshakeError::Failure(err) => panic!("server-side handshake failed"),
},
}
}
fn accept_listener(listener: &TcpListener) -> io::Result<TcpStream> {
match listener.accept() {
Ok((stream, _)) => Ok(stream),
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_millis(1));
accept_listener(listener)
}
Err(e) => Err(e),
}
}
Also, once I have it all solved would you be open to a PR with an additional example showing how to set up non-blocking streams on the client & server?
Edit: nevermind, I forgot to test it in release mode 🤦
I'm happy to create a PR for an additional example if you think it'd be helpful.
That would be nice. But please add some kind of select()
(Mio-based waiting?) to it, since there are many users that aren't understanding the difference between blocking and 100% CPU busy-waiting, and there is a huge risk of getting complaints about "100% CPU load in the example".
I feel like this one has been answered well, so it seems like we can close it!