CKite
Overview
A JVM implementation of the Raft distributed consensus algorithm written in Scala. CKite is a library to be used by distributed applications needing consensus agreement.
Status
CKite covers all the major topics of Raft including leader election, log replication, log compaction and cluster membership changes. It is a work in constant progress. For development & testing purposes it contains an embedded key-value store app demonstrating the algorithm functioning trough simple puts and gets. It will be extracted soon from the CKite library as an example of use. Performance tests will be included soon.
Features
- Leader Election
- Log Replication
- Cluster Membership Changes
- Log Compaction
- Finagle based RPC between members
- REST admin console
Example
1) Define a StateMachine and its commands
//KVStore is a simple distributed Map accepting Puts and Gets
class KVStore extends StateMachine {
val map = new ConcurrentHashMap[String, String]()
//called when a consensus has been reached for a WriteCommand or when a ReadCommand was received
override def apply(command: Command): Any = {
command match {
case Put(key: String, value: String) => {
map.put(key, value)
value
}
case Get(key: String) => map.get(key)
}
}
//called during Log replay on startup and upon installSnapshot requests
override def deserialize(snapshotBytes: Array[Byte]) = {
//some deserialization mechanism
}
//called when Log compaction is required
override def serialize(): Array[Byte] = {
//some serialization mechanism
}
}
//WriteCommands are replicated under Raft rules
case class Put[Key,Value](key: Key, value: Value) extends WriteCommand
//ReadCommands are not replicated but forwarded to the Leader
case class Get[Key](key: Key) extends ReadCommand
2) Create a CKite instance using the builder
val ckite = CKiteBuilder().withLocalBinding("localhost:9091")
.withMembersBindings(Seq("localhost:9092","localhost:9093"))
.withMinElectionTimeout(1000).withMaxElectionTimeout(1500) //optional
.withHeartbeatsInterval(250) //optional
.withDataDir("/home/ckite/data") //dataDir for persistent state (log, terms, snapshots, etc...)
.withStateMachine(new KVStore()) //KVStore is an implementation of the StateMachine trait
.build
ckite.start()
3) Send a write command
//this Put command is forwarded to the Leader and applied under Raft rules
ckite.write(Put("key1","value1"))
4) Send a consistent read command
//consistent read commands are forwarded to the Leader
val value = ckite.read[String](Get("key1"))
5) Add a new Member
//as write commands, cluster membership changes are forwarded to the Leader
ckite.addMember("someHost:9094")
6) Remove a Member
//as write commands, cluster membership changes are forwarded to the Leader
ckite.removeMember("someHost:9094")
7) Send a local read command
//alternatively you can read from its local state machine allowing possible stale values
val value = ckite.readLocal[String](Get("key1"))
8) Check leadership
//if necessary waits for elections to end
ckite.isLeader()
9) Stop ckite
ckite.stop()
Rest admin console
CKite exposes an admin console showing its status and useful metrics. If the rpc port is 9091 then the admin console is exposed under http://localhost:10091/status by default.
Leader
{
cluster: {
term: 1,
state: "Leader",
stateInfo: {
leaderUptime: "2.minutes+13.seconds+148.milliseconds",
lastHeartbeatsACKs: {
localhost:9093: "128.milliseconds",
localhost:9092: "128.milliseconds",
localhost:9095: "127.milliseconds",
localhost:9094: "128.milliseconds"
}
}
},
log: {
length: 9,
commitIndex: 9,
lastLog: {
term: 1,
index: 9,
command: {
key: "k1",
value: "v1"
}
}
}
}
Follower
{
cluster: {
term: 1,
state: "Follower",
stateInfo: {
following: "Some(localhost:9091)"
}
},
log: {
length: 9,
commitIndex: 9,
lastLog: {
term: 1,
index: 9,
command: {
key: "k1",
value: "v1"
}
}
}
}
Running KVStore example (3 members)
Run Member 1
sbt run -Dport=9091 -Dmembers=localhost:9092,localhost:9093 -DdataDir=/tmp/ckite/member1
Run Member 2
sbt run -Dport=9092 -Dmembers=localhost:9091,localhost:9093 -DdataDir=/tmp/ckite/member2
Run Member 3
sbt run -Dport=9093 -Dmembers=localhost:9092,localhost:9091 -DdataDir=/tmp/ckite/member3
Put a key-value on the leader member (take a look at the logs for election result)
curl http://localhost:10091/put/key1/value1
Get the value of key1 replicated in member 2
curl http://localhost:10092/get/key1
Checkout the admin console on any member to see the cluster status
curl http://localhost:10093/status
Add a new member (localhost:9094) to the Cluster
curl http://localhost:10091/changecluster/localhost:9091,localhost:9092,localhost:9093,localhost:9094
Run Member 4
sbt run -Dport=9094 -Dmembers=localhost:9092,localhost:9091,localhost:9093 -DdataDir=/tmp/ckite/member4
Implementation details
- Built in Scala.
- Twitter Finagle.
- Thrift.
- Twitter Scrooge.
Contributions
Feel free to contribute to CKite!. Any kind of help will be very welcome. We are happy to receive pull requests, issues, discuss implementation details, analyze the raft algorithm and whatever it makes CKite a better library. The following is a list of known pendings to be solved in CKite. You can start from there!
Pendings/WorkToDo
Leader electionLog replicationCluster Membership changesLog persistence & compaction- Extract the key value store app from CKite
- Metrics / monitoring
- Akka
- Improve REST admin console
- Other things...