多线程流式计算模型 现在的Count引擎使用的一个框架。非常轻量级。全部使用了JDK自带的线程池和队列做实现。使用起来也相当方便。 一、QuickStart 1. 2.三大基础类 2.1 EmitItem : 发布和处理的KV对的封装 2.2 ModeHandler : 执行业务逻辑的单元 已改名为Bolt 2.3 FirstModeHandler : 读取文件/队列/存储等用于生产第一批EmitItem的单元 已改名为Spout 3.拓扑类 3.1 Topology包含了一系列的ModeHandler和一种FirstModeHandler的实现。 3.2 可以按照实际的业务需求来任意组合ModeHandler的执行顺序,但是不能动态调整,每次调整都需要重新编译 3.3 拓扑的启动可以直接main方法启动,也可以放置于容器内部署启动 二、SimpleExample 1.利用map统计单词出现的次数 public class CountModeHandler extends ModeHandler { ... @Override public int execute() { //业务逻辑实现方法 EmitItem item = null; int num = 0; String word = null; Map<String, Integer> map = new HashMap<String, Integer>(); while ((item = getReadMessageQueue().poll()) != null) {//从前置队列中获取item try { word = (String) item.getMessage(0);//从item中得到要统计的word if (map.containsKey(word)) map.put(word, map.get(word) + 1);//利用map来统计word出现的次数 else map.put(word, 1); num++; } catch (Exception e) { e.printStackTrace(); } } if (num > 0) { emit(0, map);//将统计结果存放的map继续发射到下一个处理单元 } return num; } ... } 2.将不同统计Mode的map结果合并成一个map public class MergeModeHandler extends ModeHandler { ... @Override public int execute() { EmitItem item = null; int num = 0; while ((item = getReadMessageQueue().poll()) != null) { try { Map<String, Integer> m = (Map<String, Integer>) item.getMessage(0); for (String key : m.keySet()) { if (map.containsKey(key)) map.put(key, map.get(key) + m.get(key)); else map.put(key, m.get(key)); } } catch (Exception e) { e.printStackTrace(); } num++; } return num; } ... } 3.模拟读入文章并将单词发射出去的FirstModeHandler public class TestFirstModeHandler extends FirstModeHandler { public String[][] message = new String[][] { { "one", "apple", "a", "day", "doctor", "keeps", "away" }, { "when", "a", "man", "loves", "a", "woman" }, { "what", "doesn't", "kill", "you", "makes", "you", "stronger" } }; @Override public int execute() { int index = 0; for (int i = 0; i < message.length; i++) for (int j = 0; j < message[i].length; j++) { emit(index, message[i][j]); index++; } return index; } ... } 4.将各个单元组装成拓扑结构并运行 ... public static void main(String[] args) { TestTopology tt = new TestTopology(); TestFirstModeHandler h1 = new TestFirstModeHandler(); CountModeHandler h2 = new CountModeHandler(); MergeModeHandler h3 = new MergeModeHandler(); tt.prepare();//拓扑的准备阶段,可以初始化一些参数 h3.prepare(tt.map);//ModeHandler的准备阶段,同样可以初始化参数 tt.setFirstModeHandler(h1, 1).setModeHandler(h2, 4).setModeHandler(h3, 2); //拓扑设置各个单元的数量和顺序,先set哪个ModeHandler,哪个就先被执行。 tt.start();//启动 System.out.println("The last result is : " + tt.map);//输入统计单词的结果 tt.shutdown();//停止 } ... 5.TestTopology只简单的初始化一个map用来存放最终统计的结果就可以了 public class TestTopology extends NewTopology { public Map<String, Integer> map; @Override public void prepare() { map = new ConcurrentHashMap<String, Integer>(); }; } 三、TODOLIST · 分流/整流/订阅 三种拓扑结构的实现 · 监控的一些整合