sdroege/async-tungstenite

Is it possible to flush all buffered messages before closing the stream?

mara-schulke opened this issue · 5 comments

We are building a ws api using async-tungstenite and used the close frames of websockets to provide error hints for clients.
The situation which occurs quite often is the following:

async fn pseudo_handler(ws: &mut WebSocketStream<TlsStream>) -> anyhow::Result<()> {
    ws.send(Message::Binary(...).await?;
    ws.close(None).await?;
}

And the client needs / want to rely on the fact that he receives the message before any close sequences.
We know that this is expected behavior but how can we prevent this / flush before sending the close sequence?

Sadly flush() defined in futures::SinkExt and similar didnt work.

Thanks!

I don't understand the problem. Maybe you can built a minimal testcase that shows the problem and what behaviour you want?

I could do that if its really required to answer this, but basically the above code is the section that actually matters for this question;
The message order results on the client in:

Message::Close(None)
Message::Binary(...)

But we want to be sure that its sequential and not reordered

Message::Binary(...)
Message::Close(None)

Ah I see. I would consider that a bug. Messages should keep their order, and close is just another message.

Were you able to produce a testcase? I tried with the following and it seems to work as expected (first the message with payload, then the close message):

diff --git a/examples/client.rs b/examples/client.rs
index bf5f1bb..b702ace 100644
--- a/examples/client.rs
+++ b/examples/client.rs
@@ -12,10 +12,8 @@
 
 use std::env;
 
-use futures::{future, pin_mut, StreamExt};
+use futures::SinkExt;
 
-use async_std::io;
-use async_std::prelude::*;
 use async_std::task;
 use async_tungstenite::async_std::connect_async;
 use async_tungstenite::tungstenite::protocol::Message;
@@ -25,41 +23,16 @@ async fn run() {
         .nth(1)
         .unwrap_or_else(|| panic!("this program requires at least one argument"));
 
-    let (stdin_tx, stdin_rx) = futures::channel::mpsc::unbounded();
-    task::spawn(read_stdin(stdin_tx));
-
-    let (ws_stream, _) = connect_async(&connect_addr)
+    let (mut ws_stream, _) = connect_async(&connect_addr)
         .await
         .expect("Failed to connect");
     println!("WebSocket handshake has been successfully completed");
 
-    let (write, read) = ws_stream.split();
-
-    let stdin_to_ws = stdin_rx.map(Ok).forward(write);
-    let ws_to_stdout = {
-        read.for_each(|message| async {
-            let data = message.unwrap().into_data();
-            async_std::io::stdout().write_all(&data).await.unwrap();
-        })
-    };
-
-    pin_mut!(stdin_to_ws, ws_to_stdout);
-    future::select(stdin_to_ws, ws_to_stdout).await;
-}
-
-// Our helper method which will read data from stdin and send it along the
-// sender provided.
-async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender<Message>) {
-    let mut stdin = io::stdin();
-    loop {
-        let mut buf = vec![0; 1024];
-        let n = match stdin.read(&mut buf).await {
-            Err(_) | Ok(0) => break,
-            Ok(n) => n,
-        };
-        buf.truncate(n);
-        tx.unbounded_send(Message::binary(buf)).unwrap();
-    }
+    ws_stream
+        .send(Message::Text(String::from("abc")))
+        .await
+        .unwrap();
+    ws_stream.close(None).await.unwrap();
 }
 
 fn main() {
diff --git a/examples/echo-server.rs b/examples/echo-server.rs
index 412184d..5d211d1 100644
--- a/examples/echo-server.rs
+++ b/examples/echo-server.rs
@@ -46,7 +46,8 @@ async fn accept_connection(stream: TcpStream) {
     info!("New WebSocket connection: {}", addr);
 
     let (write, read) = ws_stream.split();
-    read.forward(write)
+    read.inspect(|msg| println!("{:?}", msg))
+        .forward(write)
         .await
         .expect("Failed to forward message")
 }

You would run that via

# first shell
$ cargo run --example echo-server --features async-std-runtime
# second shell
$ cargo run --example client --features async-std-runtime -- ws://127.0.0.1:8080

Or the following:

diff --git a/examples/client.rs b/examples/client.rs
index bf5f1bb..484598c 100644
--- a/examples/client.rs
+++ b/examples/client.rs
@@ -12,10 +12,8 @@
 
 use std::env;
 
-use futures::{future, pin_mut, StreamExt};
+use futures::SinkExt;
 
-use async_std::io;
-use async_std::prelude::*;
 use async_std::task;
 use async_tungstenite::async_std::connect_async;
 use async_tungstenite::tungstenite::protocol::Message;
@@ -25,41 +23,17 @@ async fn run() {
         .nth(1)
         .unwrap_or_else(|| panic!("this program requires at least one argument"));
 
-    let (stdin_tx, stdin_rx) = futures::channel::mpsc::unbounded();
-    task::spawn(read_stdin(stdin_tx));
-
-    let (ws_stream, _) = connect_async(&connect_addr)
+    let (mut ws_stream, _) = connect_async(&connect_addr)
         .await
         .expect("Failed to connect");
     println!("WebSocket handshake has been successfully completed");
 
-    let (write, read) = ws_stream.split();
-
-    let stdin_to_ws = stdin_rx.map(Ok).forward(write);
-    let ws_to_stdout = {
-        read.for_each(|message| async {
-            let data = message.unwrap().into_data();
-            async_std::io::stdout().write_all(&data).await.unwrap();
-        })
-    };
-
-    pin_mut!(stdin_to_ws, ws_to_stdout);
-    future::select(stdin_to_ws, ws_to_stdout).await;
-}
-
-// Our helper method which will read data from stdin and send it along the
-// sender provided.
-async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender<Message>) {
-    let mut stdin = io::stdin();
-    loop {
-        let mut buf = vec![0; 1024];
-        let n = match stdin.read(&mut buf).await {
-            Err(_) | Ok(0) => break,
-            Ok(n) => n,
-        };
-        buf.truncate(n);
-        tx.unbounded_send(Message::binary(buf)).unwrap();
-    }
+    ws_stream
+        .send(Message::Text(String::from("abc")))
+        .await
+        .unwrap();
+    ws_stream.close(None).await.unwrap();
+    ws_stream.flush().await.unwrap();
 }
 
 fn main() {
diff --git a/examples/echo-server.rs b/examples/echo-server.rs
index 412184d..739cdd7 100644
--- a/examples/echo-server.rs
+++ b/examples/echo-server.rs
@@ -39,16 +39,17 @@ async fn accept_connection(stream: TcpStream) {
         .expect("connected streams should have a peer address");
     info!("Peer address: {}", addr);
 
-    let ws_stream = async_tungstenite::accept_async(stream)
+    let mut ws_stream = async_tungstenite::accept_async(stream)
         .await
         .expect("Error during the websocket handshake occurred");
 
-    info!("New WebSocket connection: {}", addr);
+    println!("New WebSocket connection: {}", addr);
 
-    let (write, read) = ws_stream.split();
-    read.forward(write)
-        .await
-        .expect("Failed to forward message")
+    while let Some(msg) = ws_stream.next().await {
+        println!("{:?}", msg);
+    }
+
+    println!("done");
 }
 
 fn main() -> Result<(), Error> {

edit: This previously only read a single message on the server, but like this it's also behaving correctly.

I'm going to close this issue for now. Please re-open if you can provide a testcase that reproduces the problem.