/toycache

A memory-based fault-tolerant scalable key-value store similar to redis

Primary LanguageJavaMIT LicenseMIT

README

ToyCache是一个分布式内存数据库。大致仿照Redis的功能。支持快照、写日志、主从、hash集群(WIP)、事务;支持5种数据对象:字符串、列表、集合、映射、有序集。

开发进度

  • 基础API的支持
    • 字符串
    • 列表
    • 集合
    • 映射
    • 有序集
  • 快照
  • 写日志
  • 事务
  • 主从(缺很多单测)
  • 集群

总体设计

总体设计如下图所示

  • 首先客户端服务器之间通过Netty进行通信,通信格式为Protobuf格式。

  • NettyServer通过ToyCacheMessageHandler将收到的Request提交个MemoryExecutor,它是个单线程Executor

    //服务器端业务处理器
    private class ToyCacheMessageHandler extends ChannelInboundHandlerAdapter
    {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
        {
            Proto.Request request = (Proto.Request) msg;
            //直接向ME提交消息就行, 并且放一个Callback
            log.info(request.toString());
            globalContext.getMemoryOperationExecutor().submit(request, new SendResponseCallback(ctx));
        }
    }
  • MemoryExecutor根据请求的类型,调用不同的Handler来处理请求

  • Handler通过访问Storage来获取或更改数据以完成请求。

  • MemoryExecutor只进行内存相关的处理,涉及到硬盘或者网络通信的时候MemoryExecutor会把任务提交到其他专门的Executor来处理,例如RDBExecutor用于将快照存盘,AOFExecutor用于将写日志存盘等等。

  • 此外,系统中还有一些需要定期执行的任务,这些任务都是通过TickDriverTicker之间的相互配合来完成的。

通信设计

首先客户端服务器之间通过Netty进行通信,通信格式为Protobuf格式。由于Protobuf没有继承,因此采用组合的方式来设计。具体通信格式如下

message Request {
  MessageType messageType = 1;
  int64 writeId = 2;
  string clientTId = 3;
  int64 epoch = 4;
  ExistsRequest existsRequest = 5;
  DelRequest delRequest = 6;
  GetRequest getRequest = 7;
  SetRequest setRequest = 8;
  ....
  InnerStartSyncRequest innerStartSyncRequest = 65;
  InnerRewriteLogRequest innerRewriteLogRequest = 66;
  InnerCreateClientRequest innerCreateClientRequest = 67;
  InnerCreateFollowerToZKRequest innerCreateFollowerToZKRequest = 68;
}
  • 每种Request有其对应的MessageType,例如GetRequestMessageType就是Get,此时GetRequest属性也不为空。

ResponseRequest类似

message Response {
  MessageType messageType = 1;
  ResponseCode responseCode = 2;
  int64 writeId = 3;
  string clientTId = 4;
  ExistsResponse existsResponse = 5;
  DelResponse delResponse = 6;
  GetResponse getResponse = 7;
  SetResponse setResponse = 8;
  ExpireResponse expireResponse = 9;
  SaveResponse saveResponse = 10;
    ...
}
  • 其中多了一个ResponseCode属性,它用来标记命令的执行情况,如

      Unknown = 0;
      // 成功
      OK = 1;
      // 非法的参数
      InvalidParam = 2;

Executor设计

Executor接口都是在单线程或者线程池中执行某一种任务,其中最关键的就是MemoryExecutor,它用来操纵内存,NettyServer收到的Request会提交给MemoryExecutor执行,执行完成之后,会通过CallbackResponse传回NettyServer。一些内部的定时任务也会提交RequestMemoryExecutor执行。

我们先看Executor接口

public interface MessageExecutor {
    void submit(Proto.Request request, Callback... callbacks);

    void submit(Proto.Request request);

    void submitAndWait(Proto.Request request, Callback... callbacks) throws Exception;

    void submitAndWait(Proto.Request request) throws Exception;
}

由于大多数Executor功能都差不多,因此抽象出来一个抽象类,来放公共逻辑。

package com.t0ugh.server.executor;
@Slf4j
public abstract class AbstractMessageExecutor implements MessageExecutor{

  private final GlobalContext globalContext;
  private final ExecutorService executorService;

  protected AbstractMessageExecutor(GlobalContext globalContext, ExecutorService executorService) {
    this.globalContext = globalContext;
    this.executorService = executorService;
  }

  public void submit(Proto.Request request, Callback... callbacks){
    try{
      beforeSubmit(request);
      executorService.submit(new RunnableCommand(request, callbacks));
    } catch (RuntimeException e){
      handleException(request, e, callbacks);
    }
  }

  public void submit(Proto.Request request){
    try{
      beforeSubmit(request);
      executorService.submit(new RunnableCommand(request));
    } catch (RuntimeException e){
      handleException(request, e);
    }
  }

  public void submitAndWait(Proto.Request request, Callback... callbacks) throws Exception{
    try{
      beforeSubmit(request);
      executorService.submit(new RunnableCommand(request, callbacks)).get();
    } catch (RuntimeException e){
      handleException(request, e, callbacks);
    }
  }

  public void submitAndWait(Proto.Request request) throws Exception{
    beforeSubmit(request);
    executorService.submit(new RunnableCommand(request)).get();
    try{
      beforeSubmit(request);
      executorService.submit(new RunnableCommand(request)).get();
    } catch (RuntimeException e){
      handleException(request, e);
    }
  }

  public void shutdown(){
    executorService.shutdown();
  }

  protected GlobalContext getGlobalContext(){
    return globalContext;
  }

  protected ExecutorService getExecutorService(){
    return executorService;
  }

  protected void beforeSubmit(Proto.Request request){

  }

  protected void handleException(Proto.Request request, RuntimeException runtimeException, Callback... callbacks){

  }

  public abstract Proto.Response doRequest(Proto.Request request) throws Exception;

  @RequiredArgsConstructor
  @AllArgsConstructor
  private class RunnableCommand implements Runnable {

    @NonNull
    private final Proto.Request request;
    private Callback[] callbacks = new Callback[0];

    @Override
    public void run() {
      try{
        Proto.Response response = doRequest(request);
        Arrays.stream(callbacks).forEach(callback -> {
          callback.callback(request, response);
        });
      } catch (Exception e){
        log.error("RunnableCommand", e);
      }

    }
  }
}
  • 可以看到这里开了一个单线程的ExecutorService,它处理Request,处理完成之后调用Callback

而具体的MemoryExecutor,只需要实现doRequest方法即可

@Slf4j
public class MemoryOperationExecutor extends AbstractMessageExecutor {

  public MemoryOperationExecutor(GlobalContext globalContext) {
    super(globalContext, new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(10000)));
  }

  @Override
  public Proto.Response doRequest(Proto.Request request) {
    Handler handler = getGlobalContext().getHandlerFactory()
            .getHandler(request.getMessageType())
            .orElseThrow(UnsupportedOperationException::new);
    return handler.handle(request);
  }

  @Override
  protected void handleException(Proto.Request request, RuntimeException runtimeException, Callback... callbacks){
    Proto.Response response = MessageUtils.responseWithCode(Proto.ResponseCode.ServerBusy, request.getClientTId());
    Arrays.stream(callbacks).forEach(callback -> {
      callback.callback(request, response);
    });
  }
}
  • 可以看到,它只负责从handlerFactory中挑选合适的Handler来处理Request

我们将在下一小节讨论Handler的设计。

Handler设计

前文提到MemoryExecutor负责从HandlerFactory中挑选合适的Handler来处理Request。那么什么叫合适的Handler呢?很简单,每种MessageType都对应一个特定的Handler,例如Get对应GetHandler

@HandlerAnnotation(messageType = Proto.MessageType.Get, handlerType= HandlerType.Read)
public class GetHandler extends AbstractGenericsHandler<Proto.GetRequest, Proto.GetResponse> {

    public GetHandler(GlobalContext globalContext) {
        super(globalContext);
    }

    @Override
    public Proto.GetResponse doHandle(Proto.GetRequest getRequest) throws Exception {
        Proto.GetResponse.Builder getResponseBuilder = Proto.GetResponse.newBuilder();
        getGlobalContext().getStorage().get(getRequest.getKey()).ifPresent(getResponseBuilder::setValue);
        return getResponseBuilder.build();
    }
}

而系统启动时,HandlerFactory通过扫描Handler上的@HandlerAnnotation将这些Handler全部注册到一个Map中,具体可以看下面这个registerAll()

@Slf4j
public class HandlerFactory {

    private final Map<Proto.MessageType, Handler> m;

    ...
        
    public void registerAll(GlobalContext globalContext){
        Reflections reflections = new Reflections("com.t0ugh.server.handler");
        Set<Class<?>> classSet = reflections.getTypesAnnotatedWith(HandlerAnnotation.class);
        classSet.forEach(clazz -> {
            try {
                Proto.MessageType messageType = clazz.getAnnotation(HandlerAnnotation.class).messageType();
                Constructor<?> cons = clazz.getConstructor(GlobalContext.class);
                register(messageType, (Handler) cons.newInstance(globalContext));
            } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                e.printStackTrace();
                log.error("handler register failed", e);
            }
        });
    }

  ...
}

线程设计

本系统的线程设计参考了redis的线程设计**:即将所有内存操作由一个线程来负责。系统中所有内存操作都由MemoryExecutor负责。然后其他费时的网络通信或者读写硬盘任务都交给特定的线程来做。这样既可以避免多线程导致的大量冲突,又将单线程的效率提升到较高水平。

目前为止,系统中大概有这些线程

  1. MemoryExecutor: 负责内存操作
  2. WriteLogExecutor: 负责将写日志存盘
  3. DBExecutor: 负责将快照存盘
  4. CreateToyCacheClientExecutor: 负责在master上创建follower的客户端,这涉及到网络连接,所以单独用一个线程处理
  5. SendSyncExecutor: 负责与zk通信,更新zk上的元数据
  6. 其他Executor:不一一列举,都是干杂活的基本上。
  7. 其他组件使用的线程:例如NettyServer使用的bossGroupworkerGroupZookeeperClient使用的线程等等。

定时任务

系统中还有一些需要定期执行的任务,例如定期清理过期的kv对、定期向zk发心跳等,这些任务都是通过TickDriver来驱动的,TickDriver每隔固定的时间会tick一次,而积攒了足够数量的tick,对应的Ticker就会向对应的Executor提交请求来执行这些定时任务。

下面先看一下Ticker接口

public interface Ticker {
    void tick();
}

然后我们举定期清理过期kv对的Ticker作为例子

public class DeleteKeyTicker implements Ticker {
    private int count;
    private final ExecutorService executorService;
    private final GlobalContext globalContext;
    private final int interval;

    public DeleteKeyTicker(GlobalContext globalContext) {
        executorService = Executors.newSingleThreadExecutor();
        this.globalContext = globalContext;
        this.interval = globalContext.getConfig().getPeriodicalDeleteTick();
    }

    public void shutdown() {
        executorService.shutdown();
    }

    public void shutdownNow() {
        executorService.shutdownNow();
    }

    @Override
    public void tick() {
        executorService.submit(() -> {
            count ++;
            if(count >= interval) {
                globalContext.getMemoryOperationExecutor().submit(MessageUtils.newInnerClearExpireRequest());
                count = 0;
            }
        });
    }
}
  • 每当count== intervalDeleteKeyTicker就会向主线程提交一个InnerClearExpireRequest来清理过期的键。

那么问题来了,tick()方法是谁调用的呢

public class TickDriverImpl implements TickDriver{
	...
    @Override
    public void start() {
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                ticks.forEach(Ticker::tick);
            }
        }, 0, globalContext.getConfig().getTickInterval(), TimeUnit.MILLISECONDS);
    }
    ...
}
  • TickDriver会定期调用tick()

TickDriverTicker之间采用的是发布订阅模式

TickDriverImpl tickDriver = new TickDriverImpl(globalContext);
globalContext.setTickDriver(tickDriver);
DeleteKeyTicker deleteKeyTicker = new DeleteKeyTicker(globalContext);
RewriteLogTicker rewriteLogTicker = new RewriteLogTicker(globalContext);
SyncFollowerTicker syncSlaveTicker = new SyncFollowerTicker(globalContext);
SaveTicker saveTicker = new SaveTicker(globalContext);
tickDriver.register(deleteKeyTicker);
tickDriver.register(rewriteLogTicker);
tickDriver.register(syncSlaveTicker);
tickDriver.register(saveTicker);
tickDriver.start();
  • 当系统启动时,BootStrap会将所需Ticker全部注册到TickDriver上去。

对应类图如下

事务设计

首先介绍客户端的API,以i++为例

ToyCache toyCache = new ToyCache(ip, port);
Transaction transaction = toyCache.transcation();
int i = Integer.valueOf(toyCache.get("i"));
transaction.checkGet("i", String.parseFrom(i));
transaction.set("i", String.parseFrom(i + 1));
transaction.checkGet("i", String.parseFrom(i + 1));
transaction.set("i", String.parseFrom(i + 2));
boolean success = transaction.commit();
transaction.close();

这个API设计与redis不同,它是通过check机制来实现

  • 首先,提交的事务只有所有的check都符合,并且set不抛出异常才能成功,否则会失败并告知用户
  • 对于每个读API,都有与之对应的checkAPI,例如getcheckGet,checkAPI用于判断在getcheckGet这段时间内数据是否发生了变更,如果不一致就会导致事务的失败。

接下来介绍一下服务端是如何实现。服务端主要通过RollBacker来实现,也就是对于事务,每提交一条写命令都会生成一个对应的RollBacker,它可以将系统置为这条命令没有应用之前的状态。而当后面的check没有通过或者set抛出异常,则系统会通过RollBacker回滚这个事务之前的所有命令,让系统回归到应用事务之前的状态

RollBacker接口如下

public interface RollBacker {
    void beforeHandle(Proto.Request request);
    void rollBack();
}

一个具体的RollBacker实现

@RollBackerAnnotation(messageType = Proto.MessageType.LTrim)
public class LTrimRollBacker extends AbstractListRollBacker {

    private List<String> headList = Lists.newArrayList();
    private List<String> tailList = Lists.newArrayList();

    public LTrimRollBacker(GlobalContext globalContext) {
        super(globalContext);
    }

    @Override
    public void doRollBack() throws Exception {
        getGlobalContext().getStorage().lPush(getKey(), headList);
        getGlobalContext().getStorage().rPush(getKey(), tailList);
    }

    @Override
    public void doBeforeHandle(Proto.Request request) throws Exception {
        Proto.LTrimRequest req = request.getLTrimRequest();
        headList = getGlobalContext().getStorage().lRange(req.getKey(),0,req.getStart());
        headList.remove(headList.size() - 1);
        tailList = getGlobalContext().getStorage().lRange(req.getKey(),req.getEnd(),-1);
        tailList.remove(0);
    }
}

事务的具体处理是在MultiHandler中的,下面截取一小段关键代码

        Stack<RollBacker> rollBackerStack = new Stack<>();
        Proto.MultiResponse.Builder multiResponseBuilder = Proto.MultiResponse.newBuilder();
        multiResponseBuilder.setPass(true);
        StateUtils.startTransaction(globalContext);
        for (Proto.Request currReq : multiRequest.getRequestsList()) {
            // 先判断事务支不支持这种MessageType, 如果不支持直接break
            if (!MessageUtils.isTransactionSupported(currReq.getMessageType(), globalContext.getHandlerFactory())) {
                MessageUtils.messageTypeNotSupportedMultiResponseBuilder(multiResponseBuilder, currReq);
                break;
            }
            // 如果是写命令需要创建一个RollBacker并且压入栈中
            if (MessageUtils.isWriteRequest(currReq.getMessageType(), globalContext.getHandlerFactory())) {
                RollBacker rollBacker = globalContext.getRollBackerFactory().getRollBacker(currReq.getMessageType())
                        .orElseThrow(UnsupportedOperationException::new);
                // todo: 这里就可能抛出异常,比如ValueTypeNotMatch,此时应该终止整个事务
                rollBacker.beforeHandle(currReq);
                rollBackerStack.push(rollBacker);
            }
            Proto.Response currResp = globalContext.getHandlerFactory().getHandler(currReq.getMessageType())
                    .orElseThrow(UnsupportedOperationException::new).handle(currReq);
            // 然后检查currResp是否OK, 如果不OK就break
            if (!Objects.equals(Proto.ResponseCode.OK, currResp.getResponseCode())) {
                MessageUtils.failMultiResponseBuilder(multiResponseBuilder, currReq, currResp);
                break;
            }
            // ok了就把Response添加一下
            multiResponseBuilder.addResponses(currResp);
        }

主从设计

借助zk来实现主从设计。

zk中节点设计如下

- \toycache
   - \group1
     - \master: serverMata{serverId = %d, ..., epoch = %d}
	 - \followers:
		- \follower1:serverMata{serverId = %d, ..., epoch = %d}
        - \follower2:serverMata{serverId = %d, ..., epoch = %d}
        - ...
   - \group2
     - ... 
   - ...         
  • 其中master是临时节点,断线就会消失
  • follower%d也是临时节点

主节点、从节点、zk之间的通信如下图所示

首先,主节点和从节点都会定期向zk发HeartBeat来进行元数据同步,元数据如下

message ServerMeta {
    uint64 serverId = 1;
    uint64 epoch = 2;
    uint64 lastWriteId = 3;
    uint64 groupId = 4;
    string serverIp = 5;
    int32 serverPort = 6;
}

主节点会监听\follower路径的变化,当有节点新增、变更、删除后,主节点会更新本地的对应元数据

主节点会根据各个从节点的lastWriteId号来定期向从节点同步新的命令,从节点收到新命令后会进行一些逻辑判断然后会应用它们。当从节点发现已经应用的命令与主节点不同时,会回滚命令来保证与主节点完全一致。

从节点会监听\master路径的变化,一旦主节点掉线,从节点会获取所有其他从节点的lastWriteId数据然后拥有最大的lastWriteId并且最快到达的从节点会当选为新的主节点。

集群设计

还没写,先占个坑