6.824 - Spring 2021 - Lab1 : MapReduce
Introduction
在这个实验中,你将会构建一个 MapReduce System。你将会实现一个调用应用的 Map() 和 Reduce() 函数同时进行读/写文件的 worker 进程,和一个负责任务分发和处理异常 Worker 进程的 Coordinator 进程。你将会构建一个和提供的 MapReduce 论文类似的一个系统(Note:这个实验用 “Corrdinator” 指代论文中的 “Master” 角色)
Getting started
在开始实验之前,你需要先安装 Go 的支持。
$ git clone git://g.csail.mit.edu/6.824-golabs-2021 6.824
$ cd 6.824
$ ls
Makefile src
$
我们在 src/main/mrsequential.go
文件中给你提供了一个简单的串行 MapReduce 实现。它在一个单线程程序中的任一时刻执行 Map() 函数或者 Reduce() 函数,我们也提供给你了一组 MapReduce 应用:在 mrapps/wc.go
和 mrapps/indexr.go
中分别实现了一个单词计数器和一个文本索引。你可以像下面一样运行这个单词计数应用:
$ cd ~/6.824
$ cd src/main
$ go build -race -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run -race mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8
...
(Note:如果你没有在编译时添加
-race
选项,你将不能在运行时使用-race
)
mrsequential.go
将会从 pg-xxx.txt
文件中读取数据作为输入,并将其输出重定向到 mr-out-0
文件中。
你可以随意借用 mrsequential.go
中的代码,同时,你也应该去 mrapps/wc.go
中借鉴如何去构建一个 MapReduce 应用。
Your Job
你的任务时实现一个分布式 MapReduce 应用程序,它应该包含两种角色:coordinator 和 worker 。
在这个程序中,将会有一个 coordinator 进程和一个或多个并行执行的 worker 进程。虽然在真正的生产环境中,worker 进程将会运行在不同的机器中,但是在这个实验中,你可以在自己的运行环境中运行单机版的 MapReduce 程序。Worker 进程将会通过 RPC 来与 Coordinator 进程交流。每一个 Worker 进程都会向 Coordinator 进程发送请求获取任务,并从一个或多个文件中读取数据作为输入,执行任务,然后将执行结果输出到文件中。Coordinator 进程也应该观察 Worker 进程的状态,如果一个 Worker 进程没有在制定的时间内完成任务(在本实验中定义为 10 S),Coordinartor 应该将该任务分配给别的 Worker。
我们已经给你提供了一部分代码帮助你更好的开始。在 main
包下包含了 Coordinartor 和 Worker 的部分实现:main/mrcoordinator.go
和 main/mrworker.go
。****请不要尝试更改这个文件中的内容。**** 你应该在 mr/worker.go
,mr/coordinator.go
和 mr/rpc.go
中构建你的实现。
在构建完成后,你可以通过下面的方式在 word-count MapReduce 应用程序运行。首先,你需要确认 word-count 插件被正确的构建了。
$ go build -race -buildmode=plugin ../mrapps/wc.go
在 main
包下,启动 Coordinator 进程。
$ rm mr-out*
$ go run -race mrcoordinator.go pg-*.txt
我们为你提供的 pg-*.txt
文件应该作为 mrcoordinator.go
的输入数据。每个文件都对应一次 ‘Split’ 操作,作为 Map() 任务的输入。同时,启动时的 -race
标志将会在 Go 程序启动的同时××××。
在其他的命令行窗口,运行一个或者多个 Worker 角色。
$ go run -race mrworker.go wc.so
当 Worker 和 Coordinator 进程都执行完毕之后,你可以查看它们的输出文件 mr-out-*
。完成实验后,输出文件排序后的并集应与顺序输出匹配,如下所示:
$ cat mr-out-* | sort | more
A 509
ABOUT 2
ACT 8
...
我们在 main/test-mr.sh
中为你提供了一个测试脚本,用来测试在给定 pg-xxx.txt
文件作为输入的情况下,检查 wc
和 indexer
MapReduce 应用程序是否产生正确的输出。这些测试还检查你的实现是否并行运行 Map 和 Reduce 任务,以及你的实现是否从运行任务时崩溃的工作程序中恢复。
如果你现在运行测试脚本,则该脚本将挂起,因为协调器永远不会完成:
$ cd ~/6.824/src/main
$ bash test-mr.sh
*** Starting wc test.
你可以在 mr/coordinator.go
的 Done() 函数中将 ret := false
更改为 true
,以便协调器立即退出。 然后:
$ bash test-mr.sh
*** Starting wc test.
sort: No such file or directory
cmp: EOF on mr-wc-all
--- wc output is not the same as mr-correct-wc.txt
--- wc test: FAIL
$
测试文件希望在每一个名为 mr-out-X
的输出文件中看到输出。
同时,在 mr/coordinator.go
和 mr/worker.go
中没有代码实现,并且不会产生这些文件(或者任何其他的一些东西),所以测试将会失败。
当你完成了你的 Coding,这些脚本将会看到以下输出:
$ bash test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
$
在引用 Go RPC 之后,你也会看到以下提示错误的输出
2019/12/16 13:27:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three
忽略这些消息。
Coodinator 注册成为一个 RPC 服务之后将会检查它的方法是否适合作为 RPC 服务(有三个返回值),虽然我们都知道 Done()
不是通过调用 RPC。
A few roles
-
Map 任务应该将产生的中间输出放到一个桶(原:buckets)中作为
nReduce
Reduce 任务的输入,这个nReduce
是main/mrcoordinator.go
文件提交给MakeCoordinator()
函数的参数 -
Worker 的实现应该将第 X 个 Reduce 的计算结果输出到
mr-out-X
-
mr-out-X
文件中应该包含 Reduce 函数输出的每一行,并且这个计算结果的输出格式应该被格式化为"%v %v"
,你可以看到main/mrsequential.go
中注释了 This is the correct format 的格式化格式,如果你的实现不是这么做的将无法通过测试脚本 -
你可以修改
mr/worker.go
,mr/coordinator.go
, 和mr/rpc.go
。在测试时,你可以暂时修改其他的文件,但是在测试完之后你应该吧这些文件修改回来,我们也会对这些文件的数据进行校验以确定文件未被修改过。 -
Worker 线程应该在其所在的文件夹中输出中间输出,以便其之后将这些文件读取出来进行 Reduce 任务。
-
main/mrcoordinator.go
需要mr/coordinator.go
实现Done()
方法,使其在 RapReduce 任务结束之后返回true
。在同时,mrcoordinator.go
也应该结束。 -
当所有的任务结束之后,Worker 线程也应该退出。一个简单的方法是使用
call()
的返回值:如果 Worker 线程无法联系到 Coordinator 线程,它将会认为这个任务结束了,并且它自己也可以退出了。在你自己的实现中,你也可以让 Coordinary 线程发送一个伪任务"Please exit"
给 Worker 线程,令其结束。
Hints
-
一比较好的开始思路:
- 修改
mr/worker.go
中的Worker()
方法,使其发送一个 RPC 给 Coordinator 线程并让它分发任务 - 修改 Coordinator 线程,使其返回文件名,作为未开始的 Map 任务的输入
- 让 Worker 线程读取文件并且调用 Map 任务,就像
mrsequential.go
做的那样
- 修改
-
这个系统的 Map 任务和 Reduce 任务将会在运行时加载
main
包下的*.so
插件文件 -
如果你修改了
mr/
文件夹下的文件,你应该使用go build -race -buildmode=plugin ../mrapps/wc.go
重新 build 插件 -
这个 lab 中的多个 Worker 线程依赖于共享同一个文件系统,如果你的 Worker 运行在不同的机器上,则需要 DFS 这样的文件系统
-
中间文件的合理命名约定是
mr-X-Y
,其中 X 是 Map 任务号,Y 是 Reduce 任务号。 -
工作者的 Map 任务代码将需要一种方法,以在 Reduce 任务期间可以正确读取的方式在文件中存储中间键/值对。 一种可能是使用Go 的
encoding/json
包,要将 K-V 对写入 JSON 文件,并读回这样的文件:enc := json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv)
dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) }
-
你的 Worker 线程的 Map 部分可以使用在
worker.go
中定义的ihash(key)
函数为给定的 key 选择 Reduce 任务。 -
您可以从
mrsequential.go
借用一些代码,以读取 Map 输入文件,对 Map 和 Reduce 之间的中间 K-V 对进行排序,以及将 Reduce 任务的输出存储在文件中。 -
作为 RPC 服务器的 Coordinator 线程是并发的,所以不要忘记锁定共享数据。
-
使用 Go 的 race 检测器,并使用
go build -race
和go run -race
。 默认情况下,test-mr.sh
使用 race 检测器运行测试。 -
Worker 线程有时需要等待,比如在最后一个 Map 任务完成之后,Reduce 任务才可以开始。 一种可行的方法是 Worker 线程定期向 Coordinator 线程请求分配工作,并且在每次收到 Map 任务还未结束的结果后,都调用
time.Sleep()
陷入沉睡。 另一种可行的方法是 Coordinator 线程中的相关 RPC 处理程序具有一个循环,该循环可以等待time.Sleep()
或sync.Cond
。 Go 在其各自的线程中运行 RPC handler,因此一个 handler 正在等待的事实不会阻塞 Coordinator 线程处理其他 RPC 请求。 -
Coordinator 线程无法区分 Worker 线程的状态,依旧活着的但由于某种原因而无法正常运行的的 Worker 线程与正在执行但运行速度太慢而无法使用的 Worker 线程看起来并没有区别。 你能做的最好的事情就是让 Coordinator 线程等待一段时间,然后放弃并将任务重新发布给其他 Worker 线程。在本实验中,让 Coordinator 线程等待十秒钟,之后,Coordinator 线程应假定该 Worker 线程已经无法正常工作(当然也可能正在正常工作)。
-
如果你选择实施备份任务(第3.6节),请注意,我们测试了当 Worker 线程执行任务而不会崩溃时,你的代码没有安排无关紧要的任务。备份任务只能在相对较长的时间段(例如 10 秒)后安排。
-
要测试崩溃恢复,可以使用
mrapps/crash.go
应用程序插件,它在 Map 任务和 Reduce 任务中随机退出。 -
为了确保在崩溃时不会有人观察到部分写入的文件,MapReduce 论文提到了使用临时文件并在完全写入后对其进行原子重命名的技巧。你可以使用
ioutil.TempFile()
函数创建一个临时文件,并使用os.Rename()
原子地对其进行重命名。 -
test-mr.sh
运行子目录 mr-tmp 中的所有进程,因此,如果出现问题,并且想要查看中间文件或输出文件,请在这个文件夹中查找。你可以修改test-mr.sh
以在失败的测试后退出,因此脚本不会继续测试(并覆盖输出文件)。 -
test-mr-many.sh
提供了一个基本脚本,用于运行带超时的test-mr.sh
(这是我们测试你的代码的方式)。将运行测试的次数作为参数。 您不应并行运行多个test-mr.sh
实例,因为协调器将重用同一套接字,从而导致冲突。