Straightforward async Websockets library for Rust! With a lot of examples available!
This library is supposed to offer a simple implementation for websockets, so end-user could use this to wrap a websockets server/client into their application, offering a smooth way of setting it up into his code.
It's an async library based on tokio runtime, which uses a tokio TcpStream behind the scenes, using that as the starting point to implement the standards of WebSocket Protocol RFC, performing handshakes, reading frames, parsing masks, handling opcodes and internal payload.
Can be used as a client or server,
returning a WSConnection
, which implements the Stream
trait,
so you can continuously consume incoming messages, or send messages.
The motivation behind this, was to offer a simple way of having a WebSockets connection over your application, using as a
reference wide established libraries, like tungstenite-rs
and tokio-tungstenite
Most of all WebSockets RFC features are implemented, like:
- Handshake process, key parsing and generation
- OpCodes handling, like
Text
,Binary
,Ping
,Pong
andContinue
- Multiple subscriptions
- Scalability
- Error handling
- It passes the autobahn-test-suite
- TLS Support
- Extensions (Compression and Decompression by permessage-deflate)
Add this in your Cargo.toml
:
[dependencies]
socket-flow = "*"
This repo has different examples and flexible ways of using its dependencies to design the code as end-user needs.
We have the option of configuring all from scratch, creating the TcpListener, and managing the websockets connections, and we also have a plug-and-play option, which you can generate a Websockets server, with fewer lines of code.
This is a very practical example, because you can have a server with just calling start_server
function, which returns
an EventStream
, for consuming server events, like new connections, messages, errors and disconnections.
You can also find in: Example
The start_server
function also accepts a rustls::ServerConfig
for enabling TLS in your server.
use futures::StreamExt;
use log::*;
use socket_flow::event::{Event, ID};
use socket_flow::server::start_server;
use socket_flow::split::WSWriter;
use std::collections::HashMap;
#[tokio::main]
async fn main() {
env_logger::init();
let port: u16 = 8080;
match start_server(port).await {
Ok(mut event_receiver) => {
let mut clients: HashMap<ID, WSWriter> = HashMap::new();
info!("Server started on address 127.0.0.1:{}", port);
while let Some(event) = event_receiver.next().await {
match event {
Event::NewClient(id, client_conn) => {
info!("New client {} connected", id);
clients.insert(id, client_conn);
}
Event::NewMessage(client_id, message) => {
info!("Message from client {}: {:?}", client_id, message);
let ws_writer = clients.get_mut(&client_id).unwrap();
ws_writer.send_message(message).await.unwrap();
}
Event::Disconnect(client_id) => {
info!("Client {} disconnected", client_id);
clients.remove(&client_id);
}
Event::Error(client_id, error) => {
error!("Error occurred for client {}: {:?}", client_id, error);
}
}
}
}
Err(err) => {
eprintln!("Could not start the server due to: {:?}", err);
}
}
}
For running this example, you can clone the repo and execute:
cargo run --color=always --package socket-flow --example simple_server
Here is an echo-server example that you can also find in: Example
use futures::StreamExt;
use log::*;
use socket_flow::handshake::accept_async;
use socket_flow::stream::SocketFlowStream;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
async fn handle_connection(_: SocketAddr, stream: TcpStream) {
match accept_async(SocketFlowStream::Plain(stream)).await {
Ok(mut ws_connection) => {
while let Some(result) = ws_connection.next().await {
match result {
Ok(message) => {
if ws_connection.send_message(message).await.is_err() {
error!("Failed to send message");
break;
}
}
Err(e) => {
error!("Received error from the stream: {}", e);
break;
}
}
}
}
Err(err) => error!("Error when performing handshake: {}", err),
}
}
#[tokio::main]
async fn main() {
env_logger::init();
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
info!("Listening on: {}", addr);
while let Ok((stream, peer)) = listener.accept().await {
info!("Peer address: {}", peer);
tokio::spawn(handle_connection(peer, stream));
}
}
For running this example, you can clone the repo and execute:
cargo run --color=always --package socket-flow --example echo_server
This example, creates a TcpListener, binding it to a port, accepting connections, handling each of these connections
inside a tokio task, for handling clients concurrently. The handle_connection function, make sure the handshake process
is performed, returning a WSConnection
, which implements Stream
trait, where you can consume incoming data for this client,
and perform write operations into the socket.
It includes error handling through Result
.
Here is an example of how to run a client, that will perform some operations and disconnect gracefully:
use futures::StreamExt;
use log::*;
use rand::distr::Alphanumeric;
use rand::{rng, Rng};
use socket_flow::handshake::connect_async;
use tokio::select;
use tokio::time::{interval, Duration};
async fn handle_connection(addr: &str) {
match connect_async(addr).await {
Ok(mut ws_connection) => {
let mut ticker = interval(Duration::from_secs(5));
// it will be used for closing the connection
let mut counter = 0;
loop {
select! {
Some(result) = ws_connection.next() => {
match result {
Ok(message) => {
info!("Received message: {}", message.as_text().unwrap());
counter = counter + 1;
// close the connection if 3 messages have already been sent and received
if counter >= 3 {
if ws_connection.close_connection().await.is_err() {
error!("Error occurred when closing connection");
}
break;
}
}
Err(err) => {
error!("Received error from the stream: {}", err);
break;
}
}
}
_ = ticker.tick() => {
let random_string = generate_random_string();
let binary_data = Vec::from(random_string);
if ws_connection.send(binary_data).await.is_err() {
eprintln!("Failed to send message");
break;
}
}
}
}
}
Err(err) => error!("Error when performing handshake: {}", err),
}
}
#[tokio::main]
async fn main() {
env_logger::init();
handle_connection("ws://localhost:9002").await;
}
fn generate_random_string() -> String {
rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect()
}
Since you need a server for testing the client, you can execute our echo-server example, and on another tab execute the client example:
cargo run --color=always --package socket-flow --example client
In this example, the client will try to connect to ws://127.0.0.1:9002
,
if the connection is established, it will start sending random strings every 5 seconds into the socket.
After sending three strings, it will close the connection gracefully and end its execution.
You can check more examples over Examples.
Socket-flow passes the Autobahn Test Suite for WebSockets. Also, it has some internal tests, for ensuring reliability.
By default, this library only accepts tokio-rustls, as an adapter library for adding TLS in your client/server implementation with socket-flow.
For checking how to set up TLS in server/client, and finding some examples, go to: TLS Examples.
For setting some parameters of your websockets connection, and enabling compression and decompression, you can check how to set up that over Config and Extensions.