A simple MapReduce framework
因为MapReduce
是基于DFS
的,所以这里的简单MapReduce
是在前面实现的一个简单的DFS框架上进行修改得到的。
MapReduce
的主要**是将对大规模数据集的操作分发到由主节点master
管理下的各个分节点slave
共同完成,然后通过整合各个节点的中间结果得到最终结果。在DFS
的master
中,运行着JobTracker
,在slave
中运行着TaskTracker
,JobTracker
用于调度工作而TaskTracker
用于执行工作。
在此MapReduce
框架中,对前面DFS
框架的master
做修改得到JobTracker
,类似地,对前面DFS
框架的slave
做修改得到了 TaskTracker
。其中JobTracker
与前面DFS
中的master
的区别主要在于其接收了由客户端传过来的文件并进行了分块和分发的工作之后,需要给每个分块指定一个Mapper
(这里的Mapper
的意义要比原先的略为广泛,还包括了shuffle
过程),调度TaskTracker
执行Mapper
依次对分块进行map
操作、combine
操作(相同key
值进行合并)和shuffle
操作(hash(key) % reducerNumber
),将Mapper
的输出结果存于slave
中作为中间结果文件。其中,根据本地计算的**,我们总是把一个与分块对应的Mapper
交由拥有此分块的slave
上的TaskTracker
,以避免从其他slave
传输分块。进行完Mapper
操作之后,对应于每个分块我们得到了若干个中间结果文件(其数量取决于Reducer
的数量),JobTracker
根据用户给定的Reducer
数量在多个slave
上的TaskTracker
中随机选择若干个作为即将执行Reducer
的TaskTracker
,然后调度各个TaskTracker
从其他slave
中将对应于某个Reducer
的中间结果文件发送到此Reducer
所在的slave
上,再由JobTracker
通知TaskTracker
执行Reducer
,得到最终结果(一个Reducer
对应一个结果文件)。完成Reduce
工作之后,JobTracker
会调度TaskTracker
删除中间结果。
这里以实现均值和方差统计功能为例,Mapper
中的map
操作为:对应一个输入x
(即为一个数),包装成{x : 1}
的形式输出。combine
操作会将相同的x
组合到一起,即形成{x : [1, ... , 1]
的形式,shuffle
操作对key
进行hash(key) % reducerNumber
的操作,从而对应于一个分块文件,得到了reducerNum
个中间文件(reducerNumber
为Reducer
的数目)。Reducer
中的reduce
操作为:对于属于自己处理的所有中间结果块,遍历各个key
,取出对应的value
(一个列表),对value
中所有元素累加得到的是对应key
的个数(也就是key
这个数出现了几次)并累加到numCount
中去,同时将key
与其个数相乘然后累加到numSum1
中去,将key
的平方与其个数相乘然后累加到numSum2
中去,那么最终numCount
得到的就是数的个数,numSum1
得到的就是数的总和,numSum2
得到的就是数的总平方和。因为可能有多个Reducer
,所以还需要将多个结果文件取回到本地,然后将numCount
、numSum1
和numSum2
各自累加得到最终的所有数的个数、总和以及总平方和,最后根据
即可得到所有数的均值mean
和方差variance
。
MapReduce执行过程:
MapReduce执行结果: