/msg-rs

Messaging library for distributed systems built in Rust

Primary LanguageRustMIT LicenseMIT

msg-rs

CI License

A flexible and lightweight messaging library for distributed systems built with Rust and Tokio.

Overview

msg-rs is a messaging library that was inspired by projects like ZeroMQ and Nanomsg. It was built because we needed a Rust-native messaging library like those above.

Warning This project is still in early development and is not ready for production use.

Features

  • Multiple socket types
    • Request/Reply
    • Publish/Subscribe
    • Channel
    • Push/Pull
    • Survey/Respond
  • Stats (RTT, throughput, packet drops etc.)
  • Request/Reply basic stats
  • Queuing
  • Pluggable transport layer
    • TCP
    • TLS
    • IPC
    • UDP
    • Inproc
  • Durable IO abstraction (built-in retries and reconnections)
  • Simulation modes with Turmoil

Socket Types

Request/Reply

Example:

use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{RepSocket, ReqSocket, Tcp};

#[tokio::main]
async fn main() {
    // Initialize the reply socket (server side) with a transport
    let mut rep = RepSocket::new(Tcp::new());
    rep.bind("0.0.0.0:4444").await.unwrap();

    // Initialize the request socket (client side) with a transport
    let mut req = ReqSocket::new(Tcp::new());
    req.connect("0.0.0.0:4444").await.unwrap();

    tokio::spawn(async move {
        // Receive the request and respond with "world"
        // RepSocket implements `Stream`
        let req = rep.next().await.unwrap();
        println!("Message: {:?}", req.msg());

        req.respond(Bytes::from("world")).unwrap();
    });

    let res: Bytes = req.request(Bytes::from("hello")).await.unwrap();
    println!("Response: {:?}", res);
}

Publish/Subscribe

use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{PubSocket, SubSocket, Tcp};

#[tokio::main]
async fn main() {
    // Initialize the publisher socket (server side) with a transport
    let mut pub_socket = PubSocket::new(Tcp::new());
    pub_socket.bind("0.0.0.0:4444").await.unwrap();

    // Initialize the subscriber socket (client side) with a transport
    let mut sub_socket = SubSocket::new(Tcp::new());
    sub_socket.connect("0.0.0.0:4444").await.unwrap();

    let topic = "some_interesting_topic".to_string();

    // Subscribe to a topic
    sub_socket.subscribe(topic.clone()).await.unwrap();

    tokio::spawn(async move {
        // Values are `bytes::Bytes`
        pub_socket.publish(topic, Bytes::from("hello_world")).await.unwrap();
    });

    let msg = sub_socket.next().await.unwrap();
    println!("Received message: {:?}", msg);
}

MSRV

The minimum supported Rust version is 1.70.

Contributions & Bug Reports

Please report any bugs or issues you encounter by opening a github issue.

Pull requests are welcome! If you would like to contribute, please open an issue first to discuss the change you would like to make.

License

This project is licensed under the MIT license.