Is it possible to flush all buffered messages before closing the stream?
mara-schulke opened this issue · 5 comments
We are building a ws api using async-tungstenite and used the close frames of websockets to provide error hints for clients.
The situation which occurs quite often is the following:
async fn pseudo_handler(ws: &mut WebSocketStream<TlsStream>) -> anyhow::Result<()> {
ws.send(Message::Binary(...).await?;
ws.close(None).await?;
}
And the client needs / want to rely on the fact that he receives the message before any close sequences.
We know that this is expected behavior but how can we prevent this / flush before sending the close sequence?
Sadly flush()
defined in futures::SinkExt
and similar didnt work.
Thanks!
I don't understand the problem. Maybe you can built a minimal testcase that shows the problem and what behaviour you want?
I could do that if its really required to answer this, but basically the above code is the section that actually matters for this question;
The message order results on the client in:
Message::Close(None)
Message::Binary(...)
But we want to be sure that its sequential and not reordered
Message::Binary(...)
Message::Close(None)
Ah I see. I would consider that a bug. Messages should keep their order, and close is just another message.
Were you able to produce a testcase? I tried with the following and it seems to work as expected (first the message with payload, then the close message):
diff --git a/examples/client.rs b/examples/client.rs
index bf5f1bb..b702ace 100644
--- a/examples/client.rs
+++ b/examples/client.rs
@@ -12,10 +12,8 @@
use std::env;
-use futures::{future, pin_mut, StreamExt};
+use futures::SinkExt;
-use async_std::io;
-use async_std::prelude::*;
use async_std::task;
use async_tungstenite::async_std::connect_async;
use async_tungstenite::tungstenite::protocol::Message;
@@ -25,41 +23,16 @@ async fn run() {
.nth(1)
.unwrap_or_else(|| panic!("this program requires at least one argument"));
- let (stdin_tx, stdin_rx) = futures::channel::mpsc::unbounded();
- task::spawn(read_stdin(stdin_tx));
-
- let (ws_stream, _) = connect_async(&connect_addr)
+ let (mut ws_stream, _) = connect_async(&connect_addr)
.await
.expect("Failed to connect");
println!("WebSocket handshake has been successfully completed");
- let (write, read) = ws_stream.split();
-
- let stdin_to_ws = stdin_rx.map(Ok).forward(write);
- let ws_to_stdout = {
- read.for_each(|message| async {
- let data = message.unwrap().into_data();
- async_std::io::stdout().write_all(&data).await.unwrap();
- })
- };
-
- pin_mut!(stdin_to_ws, ws_to_stdout);
- future::select(stdin_to_ws, ws_to_stdout).await;
-}
-
-// Our helper method which will read data from stdin and send it along the
-// sender provided.
-async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender<Message>) {
- let mut stdin = io::stdin();
- loop {
- let mut buf = vec![0; 1024];
- let n = match stdin.read(&mut buf).await {
- Err(_) | Ok(0) => break,
- Ok(n) => n,
- };
- buf.truncate(n);
- tx.unbounded_send(Message::binary(buf)).unwrap();
- }
+ ws_stream
+ .send(Message::Text(String::from("abc")))
+ .await
+ .unwrap();
+ ws_stream.close(None).await.unwrap();
}
fn main() {
diff --git a/examples/echo-server.rs b/examples/echo-server.rs
index 412184d..5d211d1 100644
--- a/examples/echo-server.rs
+++ b/examples/echo-server.rs
@@ -46,7 +46,8 @@ async fn accept_connection(stream: TcpStream) {
info!("New WebSocket connection: {}", addr);
let (write, read) = ws_stream.split();
- read.forward(write)
+ read.inspect(|msg| println!("{:?}", msg))
+ .forward(write)
.await
.expect("Failed to forward message")
}
You would run that via
# first shell
$ cargo run --example echo-server --features async-std-runtime
# second shell
$ cargo run --example client --features async-std-runtime -- ws://127.0.0.1:8080
Or the following:
diff --git a/examples/client.rs b/examples/client.rs
index bf5f1bb..484598c 100644
--- a/examples/client.rs
+++ b/examples/client.rs
@@ -12,10 +12,8 @@
use std::env;
-use futures::{future, pin_mut, StreamExt};
+use futures::SinkExt;
-use async_std::io;
-use async_std::prelude::*;
use async_std::task;
use async_tungstenite::async_std::connect_async;
use async_tungstenite::tungstenite::protocol::Message;
@@ -25,41 +23,17 @@ async fn run() {
.nth(1)
.unwrap_or_else(|| panic!("this program requires at least one argument"));
- let (stdin_tx, stdin_rx) = futures::channel::mpsc::unbounded();
- task::spawn(read_stdin(stdin_tx));
-
- let (ws_stream, _) = connect_async(&connect_addr)
+ let (mut ws_stream, _) = connect_async(&connect_addr)
.await
.expect("Failed to connect");
println!("WebSocket handshake has been successfully completed");
- let (write, read) = ws_stream.split();
-
- let stdin_to_ws = stdin_rx.map(Ok).forward(write);
- let ws_to_stdout = {
- read.for_each(|message| async {
- let data = message.unwrap().into_data();
- async_std::io::stdout().write_all(&data).await.unwrap();
- })
- };
-
- pin_mut!(stdin_to_ws, ws_to_stdout);
- future::select(stdin_to_ws, ws_to_stdout).await;
-}
-
-// Our helper method which will read data from stdin and send it along the
-// sender provided.
-async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender<Message>) {
- let mut stdin = io::stdin();
- loop {
- let mut buf = vec![0; 1024];
- let n = match stdin.read(&mut buf).await {
- Err(_) | Ok(0) => break,
- Ok(n) => n,
- };
- buf.truncate(n);
- tx.unbounded_send(Message::binary(buf)).unwrap();
- }
+ ws_stream
+ .send(Message::Text(String::from("abc")))
+ .await
+ .unwrap();
+ ws_stream.close(None).await.unwrap();
+ ws_stream.flush().await.unwrap();
}
fn main() {
diff --git a/examples/echo-server.rs b/examples/echo-server.rs
index 412184d..739cdd7 100644
--- a/examples/echo-server.rs
+++ b/examples/echo-server.rs
@@ -39,16 +39,17 @@ async fn accept_connection(stream: TcpStream) {
.expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
- let ws_stream = async_tungstenite::accept_async(stream)
+ let mut ws_stream = async_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
- info!("New WebSocket connection: {}", addr);
+ println!("New WebSocket connection: {}", addr);
- let (write, read) = ws_stream.split();
- read.forward(write)
- .await
- .expect("Failed to forward message")
+ while let Some(msg) = ws_stream.next().await {
+ println!("{:?}", msg);
+ }
+
+ println!("done");
}
fn main() -> Result<(), Error> {
edit: This previously only read a single message on the server, but like this it's also behaving correctly.
I'm going to close this issue for now. Please re-open if you can provide a testcase that reproduces the problem.