  • 一个简易 kv 储存数据库
  • 存储引擎的实现类似 Bitcask (log-structured file storage)
  • 使用 tcp 进行客户端与服务端的通信, 并且支持多个 tcp 链接并发读写磁盘, 其中读流程是无锁的


To complete the project exercise of talent-plan's TP201.


  • Implement a Bitcask-like engine(a Log-structured File Storage)
  • Client-Server Networking
  • Concurrency: lock-free readers
  • Asynchronous
  • Benchmark



    kvs-client <SUBCOMMAND>

    -h, --help       Print help information
    -V, --version    Print version information

    get     Get the string value of a given string key
    help    Print this message or the help of the given subcommand(s)
    rm      Remove a given key
    set     Set the value of a string key to a string
    kvs-server [OPTIONS]

        --addr <ADDR>        Server listening address, default is
        --engine <ENGINE>    Engine type, default is kvs
    -h, --help               Print help information
    -V, --version            Print version information



rskv 默认使用类似 Bitcask 的算法来实现储存引擎, 但也可以使用其他存储引擎(只需要实现 KvsEngine trait).

Bitcask 的数据储存在磁盘的一个目录里面。目录中有多个文件(log file),其中有旧数据文件和活跃的数据文件,但是同时只有一个活跃的文件用于写入新的数据。


Bitcask on disk

rskv 储存在磁盘的数据的格式是rust enum的json序列化数据,这些数据被称为 command,其在rust code中的定义如下:

#[derive(Debug, Serialize, Deserialize)]
enum Cmd {
    Set { key: String, value: String },
    Rm { key: String },

通过 Cmd的定义我们可以看出来无论是设置键值对,还是删除操作都会以追加的方式写入活跃文件,也就是说不会以实际的方式删除或者替换掉旧的数据。

当然如果我们不断的set和rm key/value pair, 数据的大小会不断增加。因此我们需要设置一个阈值,当这些无用数据的大小超过该阈值时,存储引擎就会重新遍历一遍所有的 log files, 加载有效数据到内存,清除掉旧的文件数据,并创建一个新的文件并写入这些有效数据。

下面是 rskv 实现Bitcask的一些API接口:

// 初始化 Bitcask 储存引擎: 建立索引,计算无效数据的长度,
// 为每一个log file创建reader, 为当前的活跃文件创建 writer
pub fn open(path: impl Into<PathBuf>) -> Result<Self>

// 设置 key value
pub fn set(&mut self, key: String, value: String) -> Result<()>

// 根据 key 获取 value
pub fn get(&mut self, key: String) -> Result<Option<String>>

// 删除 key
pub fn rm(&mut self, key: String) -> Result<()>

// 私有方法, 清除无效的 `command`s
fn compact(&mut self) -> Result<()>

// Bitcask除了将数据存储在磁盘还在server运行期间,还在内存维护了一个从key到`CmdPos`索引,
// 这样就可以通过索引快速找到 key 对应的 磁盘上 `command` 数据的位置
fn index(fid: u64, 
reader: &mut BufReaderWithPos<File>, 
index: &mut BTreeMap<String, CmdPos>) -> Result<u64>


pub struct Bitcask {
    /// Directory for sotring log data, it contains many log file
    data_path: PathBuf,
    /// [Bitcask] build caches to quickly find reader belongs to `fid` using `HashMap`.
    /// This hashmap insert all exsiting log files when [Bitcask]::open is called.
    readers: HashMap<u64, BufReaderWithPos<File>>,
    /// Current writer to write `command`s into disk
    cur_writer: BufWriterWithPos<File>,
    /// log file id which the current writer write to
    cur_fid: u64,
    /// In-memory Index maps from keys(String) to [CmdPos].
    /// This is a `B-Tree` which would load `log files` in the disk into memory when [Bitcask]::open is called.
    index: BTreeMap<String, CmdPos>,
    /// The number of bytes representing "stale" commands that could be
    /// deleted during a compaction.
    uncompacted: u64,


#[derive(Debug, Clone)]
/// In-memory representation of a `command`.
/// Indicates where we can find it.
pub struct CmdPos {
    /// which file this commmand belong to
    fid: u64,
    /// start position of command
    pos: u64,
    /// length of command
    len: u64,


risk 支持多个线程并发读写磁盘,为了保证并发的高效性,其中读流程是lock-free,而写流程为了保证数据的一致因而必须是串行的,就是说在写的时候需要Mutex的保护。(读流程并不是真正的并行,因为底层文件读取一定是串行的)

为了加入并发的支持, 我们需要

  1. 使用线程池来管理不同tcp链接的读写
  2. 改变 Bitcask的结构体定义使其能被多线程共享