Socketioxide
🚀🦀
A socket.io
server implementation in Rust that integrates with the Tower
ecosystem and the Tokio stack
. It integrates with any server framework based on tower like Axum
, Warp
, Salvo
or Hyper
. Add any other tower based middleware on top of socketioxide such as CORS, authorization, compression, etc with tower-http
.
⚠️ This crate is under active development and the API is not yet stable.
With the recent migration of all frameworks to hyper v1. It can be complicated to know which version of socketioxide to use with which version of the framework. This table summarizes the compatibility between the different versions of socketioxide and the different frameworks.
Http framework | Hyper version | socketioxide version |
---|---|---|
🦀Hyper 1.0 | 1.0 | >= 0.9 |
🦀Hyper 1-rc* | 1-rc* | < 0.9 |
🦀Hyper 0.14 | 0.14 | < 0.9 |
🦀Axum 0.7 | 1.0 | >= 0.9 |
🦀Axum 0.6 | 0.14 | < 0.9 |
🦀Warp 0.3 | 0.14 | < 0.9 |
🦀Salvo 0.63 | 1.0 | >= 0.9 |
🦀Salvo 0.62 | 1-rc* | < 0.9 |
- Integrates with :
- Axum: 🏓echo example
- Warp: 🏓echo example (Not supported with
socketioxide >= 0.9.0
as long as warp doesn't migrate to hyper v1) - Hyper: 🏓echo example
- Salvo: 🏓echo example
- Out of the box support for any other middleware based on tower :
- Namespaces
- Rooms
- Ack and emit with ack
- Binary packets
- Polling & Websocket transports
- Extensions to add custom data to sockets
- Memory efficient http payload parsing with streams
- Flexible axum-like API to handle events. With extractors to extract data from your handlers
- Well tested with the official end to end test-suite
- Socket.io versions supported :
- 🔌protocol v5 : socket.io js from v3.0.0..latest, it is enabled by default
- 🔌protocol v4 : based on engine.io v3, under the feature flag
v4
, (socket.io js from v1.0.3..latest)
- Other adapter to share state between server instances (like redis adapter), currently only the in memory adapter is implemented
- State recovery when a socket reconnects
Chat app 💬 (see full example here)
io.ns("/", |s: SocketRef| {
s.on("new message", |s: SocketRef, Data::<String>(msg)| {
let username = s.extensions.get::<Username>().unwrap().clone();
let msg = Res::Message {
username,
message: msg,
};
s.broadcast().emit("new message", msg).ok();
});
s.on(
"add user",
|s: SocketRef, Data::<String>(username), user_cnt: State<UserCnt>| {
if s.extensions.get::<Username>().is_some() {
return;
}
let num_users = user_cnt.add_user();
s.extensions.insert(Username(username.clone()));
s.emit("login", Res::Login { num_users }).ok();
let res = Res::UserEvent {
num_users,
username: Username(username),
};
s.broadcast().emit("user joined", res).ok();
},
);
s.on("typing", |s: SocketRef| {
let username = s.extensions.get::<Username>().unwrap().clone();
s.broadcast()
.emit("typing", Res::Username { username })
.ok();
});
s.on("stop typing", |s: SocketRef| {
let username = s.extensions.get::<Username>().unwrap().clone();
s.broadcast()
.emit("stop typing", Res::Username { username })
.ok();
});
s.on_disconnect(|s: SocketRef, user_cnt: State<UserCnt>| {
if let Some(username) = s.extensions.get::<Username>() {
let num_users = user_cnt.remove_user();
let res = Res::UserEvent {
num_users,
username: username.clone(),
};
s.broadcast().emit("user left", res).ok();
}
});
});
Echo implementation with Axum 🏓
use axum::routing::get;
use serde_json::Value;
use socketioxide::{
extract::{AckSender, Bin, Data, SocketRef},
SocketIo,
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;
fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
info!("Socket.IO connected: {:?} {:?}", socket.ns(), socket.id);
socket.emit("auth", data).ok();
socket.on(
"message",
|socket: SocketRef, Data::<Value>(data), Bin(bin)| {
info!("Received event: {:?} {:?}", data, bin);
socket.bin(bin).emit("message-back", data).ok();
},
);
socket.on(
"message-with-ack",
|Data::<Value>(data), ack: AckSender, Bin(bin)| {
info!("Received event: {:?} {:?}", data, bin);
ack.bin(bin).send(data).ok();
},
);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(FmtSubscriber::default())?;
let (layer, io) = SocketIo::new_layer();
io.ns("/", on_connect);
io.ns("/custom", on_connect);
let app = axum::Router::new()
.route("/", get(|| async { "Hello, World!" }))
.layer(layer);
info!("Starting server");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}
Other examples are available in the example folder
Any contribution is welcome, feel free to open an issue or a PR. If you want to contribute but don't know where to start, you can check the issues.
If you have any question or feedback, please open a thread on the discussions page.
This project is licensed under the MIT license.