snapview/tungstenite-rs

Construct new websocket instance from an already existing connection?

Closed this issue · 6 comments

I have a websocket server that communicates with multiple clients simultanousely. i want to be able to send a message to any one of these clients from anywhere in the server including any other threads besides the main thread, provided i have their peer address. i attempted using "WebSocket::from_raw_socket()" but it didn't work because i could not figure out how to fill the "stream" field with the right data. Is there a way to achieve this? thanks in advance!

You've chosen incorrect approach. Creating multiple websockets from one raw socket will result in incorrect data sending and crashes. Create WebSocket just with connect and then wrap it into Arc::new(Mutex::new(websocket)). The resulting Arc<Mutex<WebSocket<_>>> can be safely copied to other threads. Note that at any time only one thread can send, and others must wait for the sending to complete.

There is better architecture. Work with WebSocket in one thread. Let other threads send to this thread what they want to be sent via WebSocket. Use std::sync::mpsc for communication between threads.

I am using the first approach you suggested. However, i am getting an error.
"move occurs because arcwebsocket has type Arc<Mutex<WebSocket<TcpStream>>>, which does not implement the Copy trait"

code looks like this:

`let websocket = accept(s).expect("accept failed");
let arcwebsocket = Arc::new(Mutex::new(websocket));

thread::spawn(move || begin_ticking(Arc::clone(&arcwebsocket)));
handle_session(Arc::clone(&arcwebsocket), session_id);`

what i am trying to do here is share "arcwebsocket" with both the new spawned thread and the handle_session() function;

You doing move || and are calling clone afterwards. You must call clone before doing move ||, like that:

let a = Arc::clone(&arcwebsocket);
thread::spawn(move || begin_ticking(a));

Please make you familiar with the semantic of move ||. You are moving the whole arcwebsocket instead of its clone, that's why it doesn't work.

Anyway, this approach is very inefficient and error-prone. Consider the mpsc approach with WebSocket in only one thread instead.

Why i wasn't using the mpsc approach is because i want to send a message every 10 milliseconds, but i noticed the loop is sort of not actually looping infinitely but only iterates when it receives a message:

loop {
let message = match websocket.read_message() {
Ok(m) => m,
Err(_) => {
break;
}
}

i noticed this because i placed a print statement inside the loop and it does not print repeatedly like a normal infinite loop would do and fill up the screen, but it prints when i send a message to the server.
i initially thought set_nonblocking(true) would fix this but it didn't.

using the first approach, i have realized why it is not ideal and i wish to use the mpsc approach instead, but the above issue makes it impossible for me.

What you're trying to do is completely wrong, it just won't work. Threads won't help you. If you write like this, your code will block and won't send anything before something is received.

Short answer: use tokio. We provide tokio-tungstenite for use with tokio. Don't use tungstenite directly, tokio-tungstenite is what you really want.

Long answer:

WebSocket protocol is TCP-based, and in TCP all writes have to be serialized. You cannot just send from arbitrary thread to a TCP socket. "Just" sending every 10 ms will definitely result in many collisions and corrupted messages. Rust is a safe language, Tungstenite is safe too, and it prohibits such kind of usage. That's why you can't just send from arbitrary threads without using mutexes or other kinds of synchronization. This would definitely lead to a bug and thus is not allowed.

The operating system provides a way to work without blocking. It is called non-blocking mode. One can switch the TcpStream to non-blocking mode, and Tungstenite provides a way to do that. Once this is done, read_message() will not block anymore. Instead it will return a WouldBlock error if there is nothing to be read. Your code is then responsible for handling this error correctly and for doing correct blocking manually, i.e. by calling select() or poll(). For going this way the mio crate is your best friend. However this is surprisingly hard to implement correctly. So don't do that yourself unless you really know what you're doing. Use a readily-made solution instead. The most popular solution is tokio. It does all the non-blocking magic under the hood and provides you a nice async/await API. Just use it.

Wow. Thanks for the response. I was looking into tokio but i planned to get familiar with using threads before async. tokio-tungstenite is the best suitable tool for what i want to do so i'll use it. Thanks!