前言
好久没有在掘金上更新了,因为最近一段时间比较忙(工作以及其他方面)。新的一年里暂时没有找到合适的学习的方向,于是先找来了mit的分布式系统课程(编号6.824),本篇文章是第一个作业(lab1:MapReduce)的部分思路介绍。貌似课程本身并不推荐大家直接把代码公开,避免有的同学抄作业,所以这里只记录了思路。
相关资料
pdos.csail.mit.edu/6.824/index…
pdos.csail.mit.edu/6.824/paper…
pdos.csail.mit.edu/6.824/labs/…
题目要求
- 实现简单的MapReduce
- 使用go语言
- 单机运行
- map任务以filename为参数,reduce任务以序号为参数
- 兼容worker crash的情况
- 支持完成时自动退出
目标拆解
- 定义coordinator(master)和worker的数据结构
- 定义rpc接口,其中coordinator作为服务端,worker作为客户端
- 实现其他逻辑
第一版实现
rpc接口
GetUndoneMapFile
:worker向coordinator请求分配新的map任务MapDone
:worker向coordinator通知map任务完成GetUndoneReduceNum
:worker向coordinator请求分配新的reduce任务ReduceDone
:worker想coordinator通知reduce任务完成
coordinator
数据结构
lock sync.Mutex
:用于保护coordinator内部的其他数据在并发访问下的正确性,更合理的方式应该是针对不同的字段设置不同的锁,这里偷了个懒只用了一个全局的锁filesMapDone map[string]bool
:用于保存各个file是否已经执行完了map步骤keysReduceDone []bool
:用于保存各个reduce序号是否已经执行完了reduce步骤worker_id int
:用于标识每次下发的map任务,即每次会把当前值返回给worker并自增1
方法
Done
:当所有map和reduce任务执行完毕时才返回true,否则返回falseGetUndoneMapFile
:如果有未完成的map任务,返回对应的filename,否则返回no map
错误MapDone
:更新filesMapDone
中对应的key的值为trueGetUndoneReduceNum
:如果有未完成的reduce任务,返回对应的index,否则返回no reduce
错误ReduceDone
:更新keysReduceDone
中对应的index的值为true
worker
数据结构
没有单独的field,只实现了各个方法。
方法
findUndoneMapFile
:通过rpc向coordinator发送GetUndoneMapFile
请求findUndoneReduceNum
:通过rpc向coordinator发送GetUndoneReduceNum
请求Worker
:worker的主循环逻辑,在一个无限循环中先调用findUndoneMapFile
,如果有map任务则执行对应的map任务,如果findUndoneMapFile
返回no map
则调用findUndoneReduceNum
,如果有reduce任务则执行对应的reduce任务,如果findUndoneReduceNum
返回no reduce
则退出整个循环。runMap
:执行具体的map任务,先读取指定的filename对应的文件内容,然后将文件名和文件内容传递到用户自定义的map函数中。当map执行完毕之后将输出的kv列表根据key排序过后输出到中间文件。输出到中间文件时需要使用os.CreateTemp
方法创建临时文件,等所有kv都写完之后再通过os.Rename
把名字改回最终的名字,这样可以避免worker crash的情况runReduce
:执行具体的reduce任务,先读取所有符合本次reduce序号的中间文件的内容,然后在内存内排序之后传递到用户自定义的reduce函数中。当reduce执行完毕之后将输出的kv列表根据key排序过后输出到最终输出文件。同样需要用到os.CreateTmep
和os.Rename
避免worker crash的情况。
测试
- 单独运行一个coordinator和一个worker时测试符合预期
- 启动一个coordinator和多个worker时测试不符合预期
排查错误原因
通过加日志的方式确认是有相同的map任务被执行了多次,其根本原因在于map任务的执行是需要时间的,在某个map任务执行过程中可能被重复下发给多个worker,同理reduce也有类似的问题,所以在coordinator增加了两个field解决重复下发的问题。
第二版实现
coordinator
数据结构
增加了以下两个field:
filesMapDoing map[string]bool
:用于保存各个file是否正在执行map步骤keysReduceDoing []bool
:用于保存各个reduce序号是否正在执行reduce步骤 这里其实可以考虑和xxxDone的map合并成一个,value类型改为枚举;更进一步其实应该记录每个任务已经下发给了哪个worker,不过这里就偷懒了。
方法
GetUndoneMapFile
:如果有未分配(而不是未完成)的map任务,返回对应的filename,更新filesMapDoing
中对应的key的值为true;否则返回no map
错误MapDone
:更新filesMapDone
中对应的key的值为true,更新filesMapDoing
中对应的key的值为falseGetUndoneReduceNum
:如果有未分配(而不是未完成)的reduce任务,返回对应的index,更新keysReduceDoing
中对应的index的值为true;否则返回no reduce
错误ReduceDone
:更新keysReduceDone
中对应的index的值为true,更新keysReduceDoing
中对应的index的值为false
测试
- 单独运行一个coordinator和一个worker时测试符合预期
- 启动一个coordinator和多个worker时测试不符合预期
排查错误原因
通过加日志的方式确认是有部分的map任务的输出结果没有被reduce任务处理,原因是部分worker进程的map任务完成的更早,当最后一个或者多个map任务已经下发但是还没结束时,如果有worker完成了自己得到的map任务,就会提前向coordinator申请reduce任务。也就是说coordinator需要等到所有map任务都结束之后才向worker下发reduce任务。
第三版实现
coordinator
方法
GetUndoneReduceNum
:如果所有map任务还没有结束,返回map not done
错误;如果有未分配的reduce任务,返回对应的index,更新keysReduceDoing
中对应的index的值为true;否则返回no reduce
错误
worker
方法
Worker
:worker的主循环逻辑,在一个无限循环中先调用findUndoneMapFile
,如果有map任务则执行对应的map任务,如果findUndoneMapFile
返回no map
则调用findUndoneReduceNum
,如果有reduce任务则执行对应的reduce任务,如果findUndoneReduceNum
返回map not done
错误则sleep 1秒之后再重试,如果findUndoneReduceNum
返回no reduce
错误则退出整个循环。
测试
- 单独运行一个coordinator和一个worker时测试符合预期
- 启动一个coordinator和多个worker时测试符合预期
- 无法通过early_exit测试
排查错误原因
测试脚本中要求所有worker和coordinator都必须在所有任务完成时才能退出,而我之前的实现中可能出现部分worker提前退出的情况。因此我们需要加入协调退出的功能。
第四版实现
rpc接口
增加了以下方法:
ShouldExit
:worker向coordinator确认是否自身应该退出
coordinator
数据结构
增加了以下field:
finishTimestamp int64
:所有任务全部完成的时间戳
方法
Done
:当finishTimestamp
的值不为0而且比当前时间小10秒以上时返回true,因为从“执行完毕”到“worker收到shouldExit”之间需要有一定的时间差,这里选了10秒ShouldExit
:当所有map和reduce任务执行完毕时才返回true,否则返回falseReduceDone
:更新keysReduceDone
中对应的index的值为true,更新keysReduceDoing
中对应的index的值为false,当所有reduce任务执行完毕时更新finishTimestamp
为当前时间
测试
- 单独运行一个coordinator和一个worker时测试符合预期
- 启动一个coordinator和多个worker时测试符合预期
- 可以通过early_exit测试
- 无法通过crash测试
排查错误原因
测试过程中会执行多个MapReduce任务,而我的某次测试结果中居然包含了前一次测试的输出。因此我们需要在每个MapReduce任务执行完毕后清理掉中间生成的map临时输出。
第五版实现
coordinator
方法
Done
:当finishTimestamp
的值不为0而且比当前时间小10秒以上时主动清理掉map任务生成的临时文件,然后再返回true
测试
- 单独运行一个coordinator和一个worker时测试符合预期
- 启动一个coordinator和多个worker时测试符合预期
- 顺利通过test-mr.sh中的所有测试
其他
值得提到的一点是,项目提供的测试脚本test-mr.sh貌似有有点问题,在”early exit test”的部分,脚本里写了一句wait -n
,注释里写这里的作用是”wait for any of the coord or workers to exit”。但是实际执行是这一行会报错(个人试了macos和centos7),因为wait命令并没有-n选项。所以上述的所有测试都是把此处的wait -n改成wait的条件下测试的。
今天的文章mit 6.824 lab1 笔记分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/21852.html