snapview/tungstenite-rs

Reading in non-blocking way

Boscop opened this issue · 12 comments

How can I read from a websocket like try_recv() from a std Receiver?
So that in my loop I don't have to wait until the client sends a message before I can send a message to the client.

You can pass an arbitrary Read + Write object to the tungstenite::client::client() (https://docs.rs/tungstenite/0.2.1/tungstenite/client/fn.client.html) function. If the underlying stream is non-blocking, tungstenite-rs will work in a non-blocking mode. In other words, it solely depends on the stream you pass to the function. You can pass a TcpStream object from mio and then you can use it in a non-blocking manner.

The clue is: if read() of your stream (TcpStream) returns WouldBlock, so will Tungstenite. The same is true for write(). This error does not damage the internal state of websocket, so you basically just poll your stream and retry when it's ready. You have to react on Error::Io(WouldBlock).

If I use tokio-tungstenite, would that solve my problem?

Probably. Both tokio-tungstenite and tungstenite are capable of doing non-blocking i/o. If you just want to do async, then Tokio is the proper solution. But if you use some custom algorithm based on Mio, Rotor or even direct epoll() calls, then use tungstenite directly.

My situation is: There will only be 1 client at a time, because I'm writing a web-UI for my Rust application that runs alongside it on the same computer (no remote clients), and I'm using websockets to push the GUI state updates.
So I don't need async in the sense that "it doesn't spawn a new thread for every client", but I need a way to send messages to the client, even if the client doesn't send a message before. Currently my code looks like this:

	let thread_ws = thread::Builder::new().name("ws".to_string()).spawn(move || {
		let port = ::std::env::var("WS_PORT").expect("WS_PORT must be set");
		let server = TcpListener::bind(&format!("127.0.0.1:{}", port)).unwrap();
		for stream in server.incoming() {
			if terminate.load(Ordering::Relaxed) { break }
			let terminate = terminate.clone();
			let tx_ui_to_main = tx_ui_to_main.clone();
			let ui_state_start = ui.lock().unwrap().serialize(); // current ui state for the new client
			// thread::spawn(move || {
				match stream {
					Ok(stream) => match handle_client(ui_state_start, terminate, stream, &rx_main_to_ui, tx_ui_to_main) {
						Ok(_) => (),
						Err(e) => warn!("Error in client: {}", e),
					},
					Err(e) => warn!("Error accepting stream: {}", e),
				}
			// });
		}
		info!("terminating ws");
	}).unwrap();

...

fn handle_client(ui_state_start: Vec<MsgMainToUi>,
                 terminate: Arc<AtomicBool>,
                 stream: TcpStream,
                 rx_main_to_ui: &Receiver<MsgMainToUi>,
                 tx_ui_to_main: Sender<MsgUiToMain>
) -> Result<()> {
	let mut socket = accept(stream).map_err(must_not_block)?;
	for msg in ui_state_start { // send current ui state to client
		send_json(&mut socket, msg)?;
	}
	while !terminate.load(Ordering::Relaxed) {
		while let Ok(msg) = rx_main_to_ui.try_recv() {
			send_json(&mut socket, msg)?;
		}
		match socket.read_message()? {
			Message::Text(msg) => match serde_json::from_str(&msg) {
				Ok(msg) => tx_ui_to_main.send(msg).unwrap(),
				Err(e) => error!("{}", e)
			},
			Message::Binary(_) => ()
		}
	}
	Ok(())
}

As you can see, I'm not even spawning a new thread for every client, as there will be only one browser tab open. When I refresh the tab, it works too, because then handle_client() returns and the reloaded tab is the next stream in the server.incoming() iterator. This allows me to re-use the Receiver rx_main_to_ui between connections, because only one will be active at a time.
So my only problem is that the loop blocks on read_message(), and new messages from the server to the client don't get sent unless the client sends a message first.
Would tokio-tungstenite solve this problem? Or is it only async in the sense of "not spawning a thread for every connection"?

Use Tokio. It's what you need here.
https://tokio.rs/
tokio-tungstenite is a binding of tungstenite for Tokio.

@Boscop as @agalakhov mentioned, tokio may help you. I think that if you need a quick solution, you can use tokio and tokio-tungstenite. Take a look at the server example from tokio-tungstenite repository. You need to modify it a bit to fit your needs. If you look at the code from the example, you'll see that the futures::mpsc channel is created which is used as a communication mean between futures. You can create as many shallow copies of tx as you want using clone(). So in your case it's sufficient to just make another copy of tx and pass it to your "GUI module" where you want to send a message. After it's done, you can simply call tx.send() and pass the appropriate message, the message will be queued up and the rest tokio does for you (the messages are sent when the underlying socket is available for writing, you don't have to worry about that, tokio does all of those things under the hood).

P.S.: Sending the message via tx is non-blocking as well.

Thanks!
I'll use tokio-tungstenite.
Btw, how can I break out of the server loop (when terminate is true like above, terminating all threads for a clean shutdown)?

(At first I was using ws-rs but there was no way to break out of the server loop...)

Just break the loop and let TcpListener go out of scope. Since the loop is explicit, you're not obligated to continue looping. Statements like break, return or replacing for ... in with while will do.

Tokio does not use threads. The tasks look like background threads but they aren't (it's similar to "green threads"). Just finish listening, and then run() will be finished together with the last client connection.

(In fact, tokio-tungstenite shows slightly better performance than ws-rs especially on large packets and it's also more standard conform according to Autobahn unless you need permessage-deflate.)

@Boscop you want to stop the server when the terminate variable is set to some specific value, did I get it right? There are several ways to achieve this, a couple ideas which I have in my mind:

  1. You may check this value inside incoming connections' for_each block and return an error in case if the terminate variable is set to the specific value. This would terminate the stream and so the server future will resolve. This is probably the simplest way.
  2. You can combine the server future and your own future (which checks the terminate variable) using Future::select() function.
  3. Given that your server has only one connection, you can write your own Future/Stream object which implements the logic you need without using combinators. This makes you more flexible, but a bit more things to code.

@agalakhov, @application-developer-DA: Thanks.
If I go with option 1 it will only be able to check and break out when a new connection is made, right?
If that's correct, I'll do option 2. Is this the right way to chain it into .select()?

let connection = ws_reader.map(|_| ()).map_err(|_| ())
         .select(ws_writer.map(|_| ()).map_err(|_| ()))
         .select(loop_fn((), |_|
            if terminate.load(Ordering::Relaxed) {
                Ok(Loop::Break(()))
            } else {
                Ok(Loop::Continue(()))
            })
         );

Or does this consume 100% and I should do this?

let (tx, rx) = oneshot::channel::<()>();
Interval::new(Duration::from_millis(1000 / fps, Handle::spawn(|_|
	if terminate.load(Ordering::Relaxed) {
		tx.send(()).unwrap()
	}
));

...

let connection = ws_reader.map(|_| ()).map_err(|_| ())
         .select(ws_writer.map(|_| ()).map_err(|_| ()))
         .select(rx);

Well, first of all loop_fn() will constantly be in a loop as the documentation for the function says ("Loop::Continue(S) reinvokes the loop function with state S. The returned future will be subsequently polled for a new"), so I don't think it is the best solution. Second, as far as I know you cannot select more than 2 futures in a such manner as select() returns SelectNext future, so when you want to combine 3 or more futures, you would rather use select_all() (take a look at the examples section in futures-rs).

If I go with option 1 it will only be able to check and break out when a new connection is made, right?

Depends on what you want to terminate. Do you want to terminate the whole server immediately after the terminate message is sent? In this case you have to select your server's future and your own future.

I would suggest a simplier way: just make a channel, upon receiving a message (in your case it is a terminate message), you will resolve the future. That's all you need ;) In other words you may want to combine this future with your own future.

Hope this helps. I'll close this issue (question) for now as it seems that the initial question is answered and further discussion went more in the direction of tokio framework.

@Boscop feel free to join tokio's gitter in case if you have more tokio-related questions, there are many people there who may help you to solve tokio-related problems, including the ones which may have had the similar issue. I'm also online there from time to time, but not always available.