本项目基于MIT 6.824 lab1 实现了一个简易的分布式MapReduce
运行test: 进入src/main目录 运行 sh test-mr.sh
MapReduce是Google提出的一种适用于大规模数据的分布式计算框架。
Paper link: https://research.google/pubs/pub62/
Master可以实现任务分发
Worker需要实现通过rpc从Master获取任务,执行任务,再反馈任务结果给Master
Parallelism: Worker需要支持并行, 即一个Master向多个Worker分配任务,多个worker并行执行任务
Fault Tolerance: Worker fail 或者Worker执行任务超时都认为任务失败,Master需要重新为这个任务分配Worker,执行任务失败的Worker不应该产生不完整的输出文件
M: Map任务的数量, 暂定为输入文件的数量(即一个输入文件对应一个Map任务)
N: Reduce任务的数量, 即最终输出文件的数量
Intermediate File: Map任务产生的中间文件, 共有 M * N 个,命名为“mr-X-Y”,X为Map任务id, Y为Reduce任务id
输出文件: Reduce任务产生的输出文件,命名为“mr-out-Y”,Y为Reduce任务id
GetTaskTimeOut: Worker获取任务的的间隔
MaxTaskWaitTime: Master等待Worker执行任务的最长时间(如果在规定时间内没有完成任务即认为worker fail,任务失败)
Phase: 整个MapReduce任务分为四个阶段
Init(初始化)Map Reduce Done(完成)
只有在前一个阶段完成后才能进入后一个阶段,所以Map阶段会出现Worker获取任务需要一直等待的情况(Map任务已经分配完但是还没有全部完成,不能进入Reduce阶段)
Worker获取任务的等待在Master端实现
type Task struct {
Phase string
Id int
}
Task的定义很简单,phase表示Task是属于Map任务还是Reduce任务
type Master struct {
mMap int
nReduce int
phase string
phaseLock sync.Mutex
taskQuantity int
taskCounter uint64
taskChan chan Task
retryTaskChan chan Task
traceTaskChan map[Task] chan string
files map[int] string
}
Master存储了MapReduce任务的M, N,当前阶段phase,当前阶段总任务数taskQuantity以及已经完成的任务taskCounter
taskChan: 任务队列channel,当Map或Reduce阶段开始的时候会向taskChan中push所有的任务,Worker则会从taskChan中获得任务
retryTaskChan: 重试任务队列channel,当分配给Worker的任务失败后,需要将任务加入retryTaskChan,通过我们的router,任务会被重新push回taskChan等待下次分配
traceTaskChan: 任务追踪channel map,当分配给Worker任务后,Master会启动一个新的go runtime从这个task对应的channel中监听任务完成的signal,这个map就是存储task对应的channel
files: 输入文件map,存储map任务id和对应的输入文件名
func MakeMaster(files []string, nReduce int) *Master {
m := Master{}
m.initiation(files, nReduce)
m.server()
return &m
}
MakeMaster是启动Master的入口,实例化一个Master对象并返回。其中主要做了两件事,一个是整个MapReduce的初始化,一个是启动rpc server
func (m *Master) server() {
rpc.Register(m)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := masterSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
func (m *Master) initiation(files []string, nReduce int) {
m.phase = InitPhase
m.mMap = len(files)
m.nReduce = nReduce
m.taskChan = make(chan Task)
m.retryTaskChan = make(chan Task)
m.traceTaskChan = make(map[Task] chan string)
m.files = make(map[int] string)
m.fillFilesMap(files)
m.initTraceTaskChan()
m.startRetryRouter()
//m.splitInputFiles(files, mMap)
m.startPhase("Map", m.mMap)
}
在initiation中,主要是对Master的参数进行了初始化,还有启动了retryRouter,最后开启Map Phase
func (m *Master) startRetryRouter() {
go m.retryRouter()
}
func (m *Master) retryRouter() {
for {
c := <-m.retryTaskChan
m.taskChan <- c
}
}
startRetryRouter其实就是启动一个go runtime,不断的把retryTaskChan中接收到的失败任务push回taskChan
func (m *Master) startPhase(phaseName string, taskQuantity int) {
m.taskQuantity = taskQuantity
m.taskCounter = 0
m.phase = phaseName
if phaseName == DonePhase {
return
}
for taskId := 0; taskId < taskQuantity; taskId++ {
go m.addTask(Task{
Phase: phaseName,
Id: taskId,
})
}
log.Printf("start phase: %v, Task quantity: %d", phaseName, taskQuantity)
}
func (m *Master) addTask(task Task) {
log.Printf("Task added: Task(%s, %d) \n", task.Phase, task.Id)
m.taskChan <- task
}
startPhase中初始化本阶段的任务数量,重置taskCounter,并将所有任务添加到taskChan中。由于channel的特性,现在的m.taskChan <- task其实是处于阻塞状态的,只有worker分配到任务,从channel中接收task后,task才算传送完成。所以需要为每个addTask启动一个go runtime
func (m *Master) getTask() Task {
m.checkAndChangePhase()
select {
case task:= <-m.taskChan:
go m.traceTask(task)
return task
case <-time.After(GetTaskTimeOut):
return m.getTask()
}
}
func (m *Master) checkAndChangePhase() {
if int(m.taskCounter) != m.taskQuantity {
return
}
m.phaseLock.Lock()
if m.phase == MapPhase {
log.Println("Start Phase Reduce")
m.startPhase(ReducePhase, m.nReduce)
} else if m.phase == ReducePhase {
m.startPhase(DonePhase, 0)
log.Println("Start Phase Done")
} else if m.phase == DonePhase {
}
m.phaseLock.Unlock()
}
getTask是分配任务的核心函数,Worker通过rpc获取任务实际上调用的就是这个函数。在每次获取任务前需要先检查并更新phase。
在checkAndChangePhase中,首先check是否当前phase所有任务都已经完成,如果没有完成则直接返回并继续请求任务。如果任务都完成了则代表当前phase已经结束,需要更新到下一个phase。更新phase的过程需要通过phaseLock保证并发安全。
这里检查int(m.taskCounter) != m.taskQuantity时好像会有并发安全的问题,但其实是没有的;一方面taskCounter的增加都是用的atomic操作保证并发安全,另一方面虽然taskCounter可能读取到的不是最新值,但这只会导致Master认为还有任务可以分配,checkAndChangePhase直接返回,但实际上没有任务了,select会一直等待直到超时,然后重新getTask。
select中监听taskChan,如果从taskChan获得了之前startPhase中添加的task或者任务失败加到retryTaskChan的重试task就返回给Worker,如果等待超时就重新getTask。
func (m *Master) traceTask(task Task) {
c := m.traceTaskChan[task]
select {
case taskStatus := <-c:
if taskStatus == TaskDone {
atomic.AddUint64(&m.taskCounter, 1)
log.Printf("Task finished: Task(%s, %d)", task.Phase, task.Id)
}
case <- time.After(MaxTaskWaitTime):
m.retryTask(task)
log.Printf("TraceTaskTimeOut: Task(%s, %d), retry...", task.Phase, task.Id)
}
}
traceTask首先获得task对应的追踪channel,之后一直等待TaskDone信号,如果在MaxTaskWaitTime中还没等到,那么认为任务失败。
func (m *Master) finishTask(task Task) {
c := m.traceTaskChan[task]
c <- TaskDone
}
finishTask为Worker完成任务后调用的函数,发送TaskDone信号到对应的traceTask channel
type GetTaskArgs struct {
}
type GetTaskReply struct {
Task Task
FileName string
MMap int
NReduce int
}
type FinishTaskArgs struct {
Task Task
}
type FinishTaskReply struct {
}
RPC的设计是很简单的,Worker和Master之间的操作其实只有getTask和finishTask。getTask中Master会返回M, N, Task和Task需要操作的输入文件filename(如果是Reduce任务则为"")
type KeyValue struct {
Key string
Value string
}
type mrFunction struct {
mapf func(string, string) []KeyValue
reducef func(string, []string) string
}
这里定义了KV键值对和mr函数,KeyValue是用来抽象统计结果的Key和值。mrFunction表示整个MapReduce使用的map和reduce函数
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
mrFunction := mrFunction{mapf: mapf, reducef: reducef}
for {
task, inputFileName := getTask()
doTask(&task, inputFileName, mrFunction)
finishTask(task)
}
}
Worker函数是Worker运行的入口函数,启动了之后Worker会循环的获取任务,执行任务和结束任务。
这里出现的一个问题是Worker怎么停止。由于整个MapReduce是由Master控制的,test中只要监听到Master退出就认为全部任务结束,所以在这里没有做过多的处理。mrMaster.go文件中会轮询Master的Done()函数检查整个MapReduce是否结束,如果已经结束Master就会直接退出。
func doTask(task *Task, inputFileName string, function mrFunction) {
if task.Phase == MapPhase {
doMapTask(task, inputFileName, function.mapf)
} else if task.Phase == ReducePhase{
doReduceTask(task, function.reducef)
}
}
doTask会根据任务类型执行map或reduce任务
func doMapTask(task *Task, inputFileName string, mapf func(string, string) []KeyValue) {
content := getInputFileContent(inputFileName)
kva := mapf(inputFileName, string(content))
tempFiles := createTempIntermediateFiles(task)
splitDataAndWriteFile(kva, tempFiles)
renameTempIntermediateFiles(tempFiles, task.Id)
//TODO: error handling
}
doMapTask会读取input文件,通过map函数获得KV结果,之后创建临时intermediate file,再通过splitDataAndWriteFile把KV写到对应的临时文件中,最后把临时文件改名为真正的intermediate file。
这里需要先使用临时中间文件来做IO是为了避免写入过程中Worker fail,留下写了一半的intermediate file。
func doReduceTask(task *Task, reducef func(string, []string) string) {
intermediate := groupDataFromIntermediateFiles(task.Id)
sortDataByKey(intermediate)
f := createTempOutputFile(task.Id)
writeOutputFile(f, intermediate, reducef)
renameOutputFile(f, task.Id)
}
doReduceTask的过程也是类似的,首先读取intermediate file, 然后根据key排序,接着对临时输出文件写入数据,最后rename输出文件。
这里的MapReduce实现虽然可以通过所有test,但依然只是比较简略的版本,可能还存在没有发现的问题和bug,欢迎大家指正。
这个项目也是我看完Clean Code以后的首次实践,感觉其中提到的一些原则虽然可能会增加一些代码量,但确实可以提高代码的整洁性和可读性,附上我总结的一些知识点:
避免误导: 不要使用acountList来命名一组账号的容器,即使这真的是个list; 小心使用字母l和O(与数字1和0混淆)
做有意义的区分:不要使用数字系列(a1,a2,a3... aN); 避免出现冗余变量或函数,如 moneyAcount&money, theMessage&Message, getAcount()&getAcountInfo()
使用可搜索的名称:命名长短与作用域正相关;单字母名称适合用在本地变量,长名称更便于在全文搜索
接口和实现: 使用ShapeFactory&ShapeFactoryImpl
把读代码的人当成白痴:明确是王道。不要假设读代码的人是专业人士
一致性:相同概念持续使用相同的命名,例如get&fetch&retrive,manager&controller&driver,避免混淆
短小:if, else,while语句中最好只有一行,那就是调用另一个函数
只做一件事:合理的函数内代码应该是不可再分的
每个函数一个抽象层级:自顶向下原则
参数:越少越好;多参数可以抽象成一个类
错误处理就是一件事:函数中try/catch/finally代码块外不应该有代码
使用异常代替enum错误码:新加的异常可以从已有异常派生出来,无需重新编译和部署使用旧异常的类
消灭重复代码
可以先按思路写,再做更名,拆分和优化