raftify is a high-level implementation of Raft, developed with the goal of making it easy and straightforward to integrate the Raft algorithm.
It uses tikv/raft-rs and gRPC for the network layer and heed (LMDB wrapper) for the storage layer.
I strongly recommend to read the basic memstore example code to get how to use this library for starters, but here's a quick guide.
Define the data to be stored in LogEntry
and how to serialize and deserialize it.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LogEntry {
Insert { key: u64, value: String },
}
impl AbstractLogEntry for LogEntry {
fn encode(&self) -> Result<Vec<u8>> {
serialize(self).map_err(|e| e.into())
}
fn decode(bytes: &[u8]) -> Result<LogEntry> {
let log_entry: LogEntry = deserialize(bytes)?;
Ok(log_entry)
}
}
Essentially, the following three methods need to be implemented for the Store
.
apply
: applies a committed entry to the store.snapshot
: returns snapshot data for the store.restore
: applies the snapshot passed as argument.
And also similarly to LogEntry
, you need to implement encode
and decode
.
#[derive(Clone, Debug)]
pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);
impl HashStore {
pub fn new() -> Self {
Self(Arc::new(RwLock::new(HashMap::new())))
}
pub fn get(&self, id: u64) -> Option<String> {
self.0.read().unwrap().get(&id).cloned()
}
}
#[async_trait]
impl AbstractStateMachine for HashStore {
async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
let log_entry: LogEntry = LogEntry::decode(&data)?;
match log_entry {
LogEntry::Insert { ref key, ref value } => {
let mut db = self.0.write().unwrap();
log::info!("Inserted: ({}, {})", key, value);
db.insert(*key, value.clone());
}
};
Ok(data)
}
async fn snapshot(&self) -> Result<Vec<u8>> {
Ok(serialize(&self.0.read().unwrap().clone())?)
}
async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> {
let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap();
let mut db = self.0.write().unwrap();
let _ = std::mem::replace(&mut *db, new);
Ok(())
}
fn encode(&self) -> Result<Vec<u8>> {
serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into())
}
fn decode(bytes: &[u8]) -> Result<Self> {
let db: HashMap<u64, String> = deserialize(bytes)?;
Ok(Self(Arc::new(RwLock::new(db))))
}
}
First bootstrap the cluster that contains the leader node.
let raft_addr = "127.0.0.1:60061".to_owned();
let node_id = 1;
let raft = Raft::bootstrap(
node_id,
raft_addr,
store.clone(),
raft_config,
logger.clone(),
)?;
tokio::spawn(raft.clone().run());
// ...
tokio::try_join!(raft_handle)?;
Then join the follower nodes.
If peer specifies the configuration of the initial members, the cluster will operate after all member nodes are bootstrapped.
let raft_addr = "127.0.0.1:60062".to_owned();
let peer_addr = "127.0.0.1:60061".to_owned();
let join_ticket = Raft::request_id(raft_addr, peer_addr).await;
let raft = Raft::bootstrap(
join_ticket.reserved_id,
raft_addr,
store.clone(),
raft_config,
logger.clone(),
)?;
let raft_handle = tokio::spawn(raft.clone().run());
raft.join_cluster(vec![join_ticket]).await;
// ...
tokio::try_join!(raft_handle)?;
If you want to operate the FSM remotely, you can use RaftServiceClient.
let mut leader_client = create_client(&"127.0.0.1:60061").await.unwrap();
leader_client
.propose(raft_service::ProposeArgs {
msg: LogEntry::Insert {
key: 1,
value: "test".to_string(),
}
.encode()
.unwrap(),
})
.await
.unwrap();
If you want to operate FSM locally, use the RaftNode type of the Raft object.
raft.propose(LogEntry::Insert {
key: 123,
value: "test".to_string(),
}.encode().unwrap()).await;
You can use a collection of CLI commands that let you inspect the data persisted in stable storage and the status of Raft Servers.
❯ raftify-cli debug persisted ./logs/node-1
---- Persisted entries ----
Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 1 }"
Key: 2, "Entry { context: [], data: ConfChange { change_type: AddNode, node_id: 2, context: [127.0.0.1:60062], id: 0 }, entry_type: EntryConfChange, index: 2, sync_log: false, term: 1 }"
Key: 3, "Entry { context: [], data: ConfChange { change_type: AddNode, node_id: 3, context: [127.0.0.1:60063], id: 0 }, entry_type: EntryConfChange, index: 3, sync_log: false, term: 1 }"
---- Metadata ----
HardState { term: 1, vote: 1, commit: 3 }
ConfState { voters: [1, 2, 3], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
Snapshot { data: HashStore(RwLock { data: {}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2, 3], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 1, term: 1 }) }
Last index: 3
You can bootstrap cluster from WAL (Write Ahead Logs), and WAL's snapshot.
This feature is useful in cases where a failure occurs in more than the number of nodes in the quorum, requiring a restart of the cluster, or when there is a need to reboot the cluster after making a batch change to the cluster members.
Use the restore_wal_from
and restore_wal_snapshot_from
options in RaftConfig
.
See this example for more details.
raftify provides bindings for the following languages.
raftify was inspired by a wide variety of previous Raft implementations.
Great thanks to all the relevant developers.
- tikv/raft-rs - Raft distributed consensus algorithm implemented using in this lib under the hood.
- ritelabs/riteraft - A raft framework, for regular people. raftify was forked from this lib.