多线程流式计算模型

现在的Count引擎使用的一个框架。非常轻量级。全部使用了JDK自带的线程池和队列做实现。使用起来也相当方便。

一、QuickStart

1.

2.三大基础类

2.1 EmitItem : 发布和处理的KV对的封装

2.2 ModeHandler : 执行业务逻辑的单元 已改名为Bolt

2.3 FirstModeHandler : 读取文件/队列/存储等用于生产第一批EmitItem的单元 已改名为Spout

3.拓扑类

3.1 Topology包含了一系列的ModeHandler和一种FirstModeHandler的实现。

3.2 可以按照实际的业务需求来任意组合ModeHandler的执行顺序,但是不能动态调整,每次调整都需要重新编译

3.3 拓扑的启动可以直接main方法启动,也可以放置于容器内部署启动

二、SimpleExample

1.利用map统计单词出现的次数

public class CountModeHandler extends ModeHandler {
 
...
 
 @Override
 
 public int execute() {  //业务逻辑实现方法
 
    EmitItem item = null;
 
     int num = 0;
 
     String word = null;
 
     Map<String, Integer> map = new HashMap<String, Integer>();
 
     while ((item = getReadMessageQueue().poll()) != null) {//从前置队列中获取item
 
         try {
 
            word = (String) item.getMessage(0);//从item中得到要统计的word
 
            if (map.containsKey(word))
 
                map.put(word, map.get(word) + 1);//利用map来统计word出现的次数
 
             else
 
                map.put(word, 1);
 
            num++;
 
         } catch (Exception e) {
 
            e.printStackTrace();
 
         }
 
     }
 
     if (num > 0) {
 
         emit(0, map);//将统计结果存放的map继续发射到下一个处理单元
 
     }
 
    return num;
 
 }
 
 
...
}
2.将不同统计Mode的map结果合并成一个map

public class MergeModeHandler extends ModeHandler {
 
 
...
@Override
 
 public int execute() {
 
    EmitItem item = null;
 
     int num = 0;
 
     while ((item = getReadMessageQueue().poll()) != null) {
 
         try {
 
             Map<String, Integer> m = (Map<String, Integer>) item.getMessage(0);
 
             for (String key : m.keySet()) {
 
                 if (map.containsKey(key))
 
                    map.put(key, map.get(key) + m.get(key));
 
                else
 
                     map.put(key, m.get(key));
 
            }
 
 
 
 
         } catch (Exception e) {
 
             e.printStackTrace();
 
     }
 
     num++;
 
     }
 
    return num;
 
 }
 
 
...
}
3.模拟读入文章并将单词发射出去的FirstModeHandler

public class TestFirstModeHandler extends FirstModeHandler {
 
 
    public String[][] message = new String[][] {
 
     { "one", "apple", "a", "day", "doctor", "keeps", "away" },
 
     { "when", "a", "man", "loves", "a", "woman" },
 
     { "what", "doesn't", "kill", "you", "makes", "you", "stronger" } };
 
 
    @Override
 
     public int execute() {
 
        int index = 0;
 
        for (int i = 0; i < message.length; i++)
 
         for (int j = 0; j < message[i].length; j++) {
 
             emit(index, message[i][j]);
 
             index++;
 
         }
 
         return index;
 
     }
...
 
 
}
4.将各个单元组装成拓扑结构并运行

...
public static void main(String[] args) {
 
 TestTopology tt = new TestTopology();
 
 
 
 
 TestFirstModeHandler h1 = new TestFirstModeHandler();
 
 CountModeHandler h2 = new CountModeHandler();
 
 MergeModeHandler h3 = new MergeModeHandler();
 
 tt.prepare();//拓扑的准备阶段,可以初始化一些参数
 
 h3.prepare(tt.map);//ModeHandler的准备阶段,同样可以初始化参数
 
 tt.setFirstModeHandler(h1, 1).setModeHandler(h2, 4).setModeHandler(h3, 2); //拓扑设置各个单元的数量和顺序,先set哪个ModeHandler,哪个就先被执行。
 
 tt.start();//启动
 
 System.out.println("The last result is : " + tt.map);//输入统计单词的结果
 
 tt.shutdown();//停止
 
 }
 
 
...
5.TestTopology只简单的初始化一个map用来存放最终统计的结果就可以了

public class TestTopology extends NewTopology {
 
 
 
 
 public Map<String, Integer> map;
 
 
 
 
 @Override
 
 public void prepare() {
 
 map = new ConcurrentHashMap<String, Integer>();
 
 };
 
 
 
 
}
三、TODOLIST

· 分流/整流/订阅 三种拓扑结构的实现

· 监控的一些整合