When building a distributed system one principal goal is often to build in fault-tolerance. That is, if one particular node in a network goes down, or if there is a network partition, the entire cluster does not fall over. The cluster of nodes taking part in a distributed consensus protocol must come to agreement regarding values, and once that decision is reached, that choice is final.
Distributed Consensus Algorithms often take the form of a replicated state machine and log. Each state machine accepts inputs from its log, and represents the value(s) to be replicated, for example, a hash table. They allow a collection of machines to work as a coherent group that can survive the failures of some of its members.
Two well known Distributed Consensus Algorithms are Paxos and Raft. Paxos is used in systems like Chubby by Google, and Raft is used in things like tikv
or etcd
. Raft is generally seen as a more understandable and simpler to implement than Paxos.
Raft replicates the state machine through logs. If you can ensure that all the machines have the same sequence of logs, after applying all logs in order, the state machine will reach a consistent state.
A complete Raft model contains 4 essential parts:
-
Consensus Module, the core consensus algorithm module;
-
Log, the place to keep the Raft logs;
-
State Machine, the place to save the user data;
-
Transport, the network layer for communication.
Note: This Raft implementation in Rust includes the core Consensus Module only, not the other parts. The core Consensus Module in the Raft crate is customizable, flexible, and resilient. You can directly use the Raft crate, but you will need to build your own Log, State Machine and Transport components.
Before using the Raft crate, include this project as a dependency:
[dependencies]
raft = "0.2"
You can use RawNode::new()
to create the Raft node. To create the Raft node, you need to build a Log component, which is called Raft Storage in the Raft crate, and then configure the Raft node.
-
Build Raft Storage
Raft Storage saves all the information about the current Raft implementation, including Raft Log, commit index, the leader to vote for, etc.
raft::storage::Storage
is defined in storage.rs. See the following trait interfaces in detail:-
initial_state
is called when Raft is initialized. This interface will return aRaftState
which containsHardState
andConfState
;-
HardState
contains the last meta information including commit index, the vote leader, and the vote term; -
ConfState
records the current node IDs like[1, 2, 3]
in the cluster. Every Raft node must have a unique ID in the cluster;
-
-
entries
returns the Log entries in an interval[low, high)
; -
term
returns the term of the entry at Log index; -
first_index
andlast_index
return the first and last index of the Log;
Pay attention to what is returned when there is no Log but it needs to get the
term
at indexfirst_index() - 1
. To solve this, you can use a dummy Log entry to keep the last truncated Log entry. Seeentries: vec![Entry::new()]
as a reference.- The last interface is
snapshot
, which returns a Snapshot of the current state machine. This Snapshot data will be sent to another node.
-
-
Configure the Raft node
After creating the Raft Storage, use
RawNode::new()
to create the Raft node. Thenew
function needs a Raft Storage and a configuration. The following fields are important for this configuration:-
id
: the unique ID of the node in the cluster, which must be unique; -
election_tick
: how many ticks the follower re-campaigns if it doesn’t receive any message from the leader; -
heartbeat_tick
: how many ticks the leader sends the heartbeat to the followers to keep alive; -
applied
: the last applied index for the state machine. Raft will resume applying Log entries to the state machine from this index; -
max_size_per_msg
: Raft can send many Log entries at the same time, so we need to limit the maximum size of the sending message. This is an optimization for Transport in batch; -
max_inflight_msgs
: how many messages the leader can send to the followers without acknowledgement. This is an optimization for the Transport in pipeline. -
election_tick
must be larger thanheartbeat_tick
. If our tick interval is 100 ms, we can use 10 forelection_tick
and 3 forheartbeat_tick
, which means the leader will send heartbeat to the followers every 300 ms and the follower will re-campaign without receiving any messages after 1 second. -
The
read_only_option
enables you to choose the linearizability mode or the lease mode to read data. If you don’t care about the read consistency and want a higher read performance, you can use the lease mode.
Other important fields like
check_quorum
andpre_vote
are used to avoid the disturbance and make the cluster more stable. -
-
Use a timer to run the Raft node regularly. See the following example for using Rust channel
recv_timeout
:let mut t = Instant::now(); let mut timeout = Duration::from_millis(100); loop { match receiver.recv_timeout(timeout) { Ok(...) => (), Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => return, } let d = t.elaspsed(); if d >= timeout { t = Instant::now(); timeout = Duration::from_millis(100); // We drive Raft every 100ms. r.tick(); } else { timeout -= d; } }
As is shown in the above example, the Raft node is driven to run every 100 ms set by the
tick
function. -
Use the
propose
function to drive the Raft node when the client sends a request to the Raft server. You can callpropose
to add the request to the Raft log explicitly.In most cases, the client needs to wait for a response for the request. For example, if the client writes a value to a key and wants to know whether the write succeeds or not, but the write flow is asynchronous in Raft, so the write log entry must be replicated to other followers, then committed and at last applied to the state machine, so here we need a way to notify the client after the write is finished.
One simple way is to use a unique ID for the client request, and save the associated callback function in a hash map. When the log entry is applied, we can get the ID from the decoded entry, call the corresponding callback, and notify the client.
-
You can call the
step
function when you receive the Raft messages from other nodes.Here is a simple example to use
propose
andstep
:let mut cbs = HashMap::new(); loop { match receiver.recv_timeout(d) { Ok(Msg::Propose { id, callback }) => { cbs.insert(id, callback); r.propose(vec![id], false).unwrap(); } Ok(Msg::Raft(m)) => r.step(m).unwrap(), ... } ... }
In the above example, we use a channel to receive the propose
and step
messages. We only propose the request ID to the Raft log. In your own practice, you can embed the ID in your request and propose the encoded binary request data.
When your Raft node is ticked and running, Raft may enter a Ready
state. You need to first use has_ready
to check whether Raft is ready. If yes, use the ready
function to get a Ready
state:
if !r.has_ready() {
return;
}
// The Raft is ready, we can do something now.
let mut ready = r.ready();
The Ready
state contains many information, and you need to check and process them one by one:
-
Check whether
snapshot
is empty or not. If not empty, it means that the Raft node has received a Raft snapshot from the leader and we must apply the snapshot:if !raft::is_empty_snap(&ready.snapshot) { // This is a snapshot, we need to apply the snapshot at first. r.mut_store() .wl() .apply_snapshot(ready.snapshot.clone()) .unwrap(); }
-
Check whether
entries
is empty or not. If not empty, it means that there are newly added entries but has not been committed yet, we must append the entries to the Raft log:if !ready.entries.is_empty() { // Append entries to the Raft log r.mut_store().wl().append(&ready.entries).unwrap(); }
-
Check whether
hs
is empty or not. If not empty, it means that theHardState
of the node has changed. For example, the node may vote for a new leader, or the commit index has been increased. We must persist the changedHardState
:if let Some(ref hs) = ready.hs { // Raft HardState changed, and we need to persist it. r.mut_store().wl().set_hardstate(hs.clone()); }
-
Check whether
messages
is empty or not. If not, it means that the node will send messages to other nodes. There has been an optimization for sending messages: if the node is a leader, this can be done together with step 1 in parallel; if the node is not a leader, it needs to reply the messages to the leader after appending the Raft entries:if !is_leader { // If not leader, the follower needs to reply the messages to // the leader after appending Raft entries. let msgs = ready.messages.drain(..); for _msg in msgs { // Send messages to other peers. } }
-
Check whether
committed_entires
is empty or not. If not, it means that there are some newly committed log entries which you must apply to the state machine. Of course, after applying, you need to update the applied index and resumeapply
later:if let Some(committed_entries) = ready.committed_entries.take() { let mut _last_apply_index = 0; for entry in committed_entries { // Mostly, you need to save the last apply index to resume applying // after restart. Here we just ignore this because we use a Memory storage. _last_apply_index = entry.get_index(); if entry.get_data().is_empty() { // Emtpy entry, when the peer becomes Leader it will send an empty entry. continue; } match entry.get_entry_type() { EntryType::EntryNormal => handle_normal(entry), EntryType::EntryConfChange => handle_conf_change(entry), } } }
-
Call
advance
to prepare for the nextReady
state.r.advance(ready);
For more information, check out an example.
raft
is intended to track the latest stable
, though you'll need to use nightly
to simulate a full CI build with clippy
.
Using rustup
you can get started this way:
rustup override set stable
rustup toolchain install nightly
In order to have your PR merged running the following must finish without error:
cargo +nightly test --features dev
You may optionally want to install cargo-check
to allow for automated rebuilding while editing:
cargo watch -s "cargo check --features dev"
If proto file eraftpb.proto
changed, run the command to regenerate eraftpb.rs
:
protoc proto/eraftpb.proto --rust_out=src
You can check Cargo.toml
to find which version of protobuf-codegen
is required.
Thanks etcd for providing the amazing Go implementation!
- TiKV, a distributed transactional key value database powered by Rust and Raft.