分布式计算算法MapReduce的实现

前言

这次实验是在之前暑假的时候完成的,由于时间久远,只记得当时做了好久,一脸懵逼。

现在重新回顾了MapReduce的论文和这次实验,比起之后的实验算是不难的了。

下面是该实验的实验报告。

实验1介绍

在lab1中我们将建立一个MapReduce 库,学习如何使用 Go 建立一个容错的分布式系统。在 Part A 我们需要写一个简单的 MapReduce 程序。在 Part B我们需要实现一个 Master 为 MapReduce 的 Workers 安排任务,并处理 Workers 出现的错误。

代码执行流程

  1. 应用提供一些的输入文件,一个map函数,一个reduce函数,以及reduce任务的数量(nReduce)
  2. Master开启一个RPC服务,然后等待Workers来注册(使用maste.go/Register())。当有Task时,schedule.go/schedule()决定将这些Task指派给workers,以及如何处理Workers的错误
  3. Master将每个输入文件当作Task,然后调用common_map.go/dMap()。这个过程既可以直接进行(串行模式),也可以通过RPC来让Workers做。每次调用doMap()都会读取相应的文件,运行map函数,将key/value结果写入nReduce个中间文件之中。在map结束后总共会生成nMap * nReduce个文件。命令格式为:前缀-map编号-reduce编号(见common.go),例如,如果有两个map tasks以及三个reduce tasks,map会生成2 * 3 = 6个中间文件。 mrtmp.xxx-0-0 mrtmp.xxx-0-1 mrtmp.xxx-0-2 mrtmp.xxx-1-0 mrtmp.xxx-1-1 mrtmp.xxx-1-2 每个Worker必须能够读取其他Worker写入的文件。真正的分布式系统会使用分布式存储来实现不同机器之间的共享,在这里我们将所有的Workers运行在一台电脑上,并使用本地文件系统
  4. 此后Master会调用common_reduce.go/doReduce(),与doMap()一样,它也能直接完成或者通过工人完成。doReduce()将按照reduce task编号来汇总,生成nRduce个结果文件。例如上面的例子中按照如下分组进行汇总: // reduce task 0 mrtmp.xxx-0-0 mrtmp.xxx-1-0 // reduce task 1 mrtmp.xxx-0-1 mrtmp.xxx-1-1 // reduce task 2 mrtmp.xxx-0-2 mrtmp.xxx-1-2
  5. 此后Master调用master_splitmerge.go/mr.merge()将所有生成的文件整合成一个输出。
  6. Master发送Shutdown RPC给所有Workers,然后关闭自己的RPC服务

Part I: Map/Reduce input and output

在我们实现 Map/Reduce 之前,首先要修复一个串行实现。给出的源码缺少两个关键部分: 1. 划分 Map 输出的函数 doMap() 2. 汇总 Reduce 输入的函数 doReduce() 首先建议阅读 common.go,这里定义了会用到的数据类型,文件命名方法等

doMap() 函数

首先总结一下 Map 的过程,它对于每个 Map Task,都会进行以下操作: 1. 从某个数据文件 A.txt 中读取数据。 2. 自定义函数 mapF 对 A.txt 中的文件进行解读,形成一组 {Key, Val} 对。 3. 生成 nReduce 个子文件 A_1, A_2, …, A_nReduce。 4. 利用 {Key, Val} 中的 Key 值做哈希,将得到的值对 nReduce 取模,以此为依据将其分配到子文件之中。

func doMap(
	jobName string, // the name of the MapReduce job
	mapTaskNumber int, // which map task this is
	inFile string,
	nReduce int, // the number of reduce task that will be run ("R" in the paper)
	mapF func(file string, contents string) []KeyValue,
) {
	// 你需要重写这个函数。你可以通过reduceName获取文件名,使用map任务的输入为reduce任务提供输出。
	// 下面给出的ihash函数应该被用于决定每个key属于的文件。
	//
	// map任务的中间输入以多文件的形式保存在文件系统上,它们的文件名说明是哪个map任务产生的,同时也说明哪个reduce任务会处理它们。
	// 想出如何存储键/值对在磁盘上的方案可能会非常棘手,特别地, 当我们考虑到key和value都包含新行(newlines),引用(quotes),或者其他
	// 你想到的字符。
	//
	// 有一种格式经常被用来序列化数据到字节流,然后可以通过字节流进行重建,这种格式是json。你没有被强制使用JSON,但是reduce任务的输出
	// 必须是JSON格式,熟悉JSON数据格式会对你有所帮助。你可以使用下面的代码将数据结构以JSON字符串的形式输出。对应的解码函数在common_reduce.go
	// 可以找到。
	//
	//   enc := json.NewEncoder(file)
	//   for _, kv := ... {
	//     err := enc.Encode(&kv)
	//
	//   记得关闭文件当你写完全部的数据之后。


	// 注:Map的大致流程如下(官方教材建议不上传代码,所以去除)
	//  S1:  打开输入文件,并且读取全部数据
	//  S2: 调用用户自定义的mapF函数,分检数据,在word count的案例中分割成单词
	//  S3: 将mapF返回的数据根据key分类,跟文件名对应(reduceName获取文件名)
	//  S4:  将分类好的数据分别写入不同文件

	// 打印参数
	fmt.Printf("jobName = %s, inFile = %s, mapTaskNumber = %d, nReduce = %d\n", jobName, inFile, mapTaskNumber, nReduce);

	// 读取文件
	bytes, err := ioutil.ReadFile(inFile);
	if err != nil {
		log.Fatal("Unable to read file: ", inFile);
	}

	// 解析输入文件为键值对数组
	kv_pairs := mapF(inFile, string(bytes))

	// 生成一组encoder用来对应文件
	encoders := make([]*json.Encoder, nReduce)
	for reduceTaskNumber := 0; reduceTaskNumber < nReduce; reduceTaskNumber++ {
		// reduceName(在comman.go)返回一个特定的文件名
		filename := reduceName(jobName, mapTaskNumber, reduceTaskNumber)
		file_ptr, err := os.Create(filename)
		if err != nil {
			log.Fatal("Unable to create file: ", filename)
		}

		defer file_ptr.Close()
		encoders[reduceTaskNumber] = json.NewEncoder(file_ptr)
	}

	// 利用encoder将键值对写入对应文件
	for _, key_val := range kv_pairs {
		key := key_val.Key
		reduce_idx := ihash(key) % uint32(nReduce)
		err := encoders[reduce_idx].Encode(key_val)
		if err != nil {
			log.Fatal("Unable to write to file")
		}
	}
}

doReduce()函数

func doReduce(
	jobName string, // the name of the whole MapReduce job
	reduceTaskNumber int, // which reduce task this is
	nMap int, // the number of map tasks that were run ("M" in the paper)
	reduceF func(key string, values []string) string,
) {
	// 你需要完成这个函数。你可与获取到来自map任务生产的中间数据,通过reduceName获取到文件名。
	//  记住你应该编码了值到中间文件,所以你需要解码它们。如果你选择了使用JSON,你通过创建decoder读取到多个
	// 解码之后的值,直接调用Decode直到返回错误。
	//
	// 你应该将reduce输出以JSON编码的方式保存到文件,文件名通过mergeName获取。我们建议你在这里使用JSON,

	// key是中间文件里面键值,value是字符串,这个map用于存储相同键值元素的合并

	// Reduce的过程如下:
	//   S1: 获取到Map产生的文件并打开(reduceName获取文件名)
	//  S2:获取中间文件的数据(对多个map产生的文件更加值合并)
	//  S3:打开文件(mergeName获取文件名),将用于存储Reduce任务的结果
	//  S4:合并结果之后(S2),进行reduceF操作, work count的操作将结果累加,也就是word出现在这个文件中出现的次数

	// 查看参数
	fmt.Printf("Reduce: job name = %s, reduce task id = %d, nMap = %d\n", jobName, reduceTaskNumber, nMap);

	// 建立哈希表,以slice形式存储同一key的所有value
	kv_map := make(map[string]([]string))

	// 读取同一个reduce task 下的所有文件,保存至哈希表
	for mapTaskNumber := 0; mapTaskNumber < nMap; mapTaskNumber++ {
		filename := reduceName(jobName, mapTaskNumber, reduceTaskNumber)
		f, err := os.Open(filename)
		if err != nil {
			log.Fatal("Unable to read from: ", filename)
		}
		defer f.Close()

		decoder := json.NewDecoder(f)
		var kv KeyValue
		for ; decoder.More(); {
			err := decoder.Decode(&kv)
			if err != nil {
				log.Fatal("json decode failed, ", err)
			}
			kv_map[kv.Key] = append(kv_map[kv.Key], kv.Value)
		}
	}
	
	// 对哈希表所有key进行升序排序
	keys := make([]string, 0, len(kv_map))
	for k, _ := range kv_map {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	// 利用自定义的reduceF函数处理同一key下所有val
	// 按照key的顺序将结果以{{key, new_val}}形式输出
	outf, err := os.Create(mergeName(jobName, reduceTaskNumber))
	if err != nil {
		log.Fatal("Unable to create file: ", mergeName(jobName, reduceTaskNumber))
	}
	defer outf.Close()
	encoder := json.NewEncoder(outf)
	for _,k := range keys {
		encoder.Encode(KeyValue{k, reduceF(k, kv_map[k])})
	}
}

运行part1.sh或直接在命令行输入:

go test -run Sequential

即可看到运行结果:

ter: Starting Map/Reduce task test
jobName = test, inFile = 824-mrinput-0.txt, mapTaskNumber = 0, nReduce = 3
Reduce: job name = test, reduce task id = 0, nMap = 1
Reduce: job name = test, reduce task id = 1, nMap = 1
Reduce: job name = test, reduce task id = 2, nMap = 1
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
master: Starting Map/Reduce task test
jobName = test, inFile = 824-mrinput-0.txt, mapTaskNumber = 0, nReduce = 3
jobName = test, inFile = 824-mrinput-1.txt, mapTaskNumber = 1, nReduce = 3
jobName = test, inFile = 824-mrinput-2.txt, mapTaskNumber = 2, nReduce = 3
jobName = test, inFile = 824-mrinput-3.txt, mapTaskNumber = 3, nReduce = 3
jobName = test, inFile = 824-mrinput-4.txt, mapTaskNumber = 4, nReduce = 3
Reduce: job name = test, reduce task id = 0, nMap = 5
Reduce: job name = test, reduce task id = 1, nMap = 5
Reduce: job name = test, reduce task id = 2, nMap = 5
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
PASS
ok  	_/root/Distributed-Systems/6.824/src/mapreduce	3.831s

Part II: Single-worker word count

既然map和reduce任务已经被连接,那么我们开始实现一些Map/Reduce操作。在这个实验中我们会实现单词计数器——一个简单经典的Map/Reduce例子。特别地,你们的任务是改写mapF和reduceF函数,这样wc.go就可以报告每个单词的数量了。一个单词是任何相邻的字母序列,正如unicode.IsLetter定义的那样。

这里有一些以pg-开头的输入文件位于~/6.824/src/main路径下面。尝试编译我们通过给你的软件,然后就使用我们提供的输入文件运行看看:

    $ cd 6.824
    $ export "GOPATH=$PWD"
    $ cd "$GOPATH/src/main"
    $ go run wc.go master sequential pg-*.txt
    # command-line-arguments
    ./wc.go:14: missing return at end of function
    ./wc.go:21: missing return at end of function

编译失败的原因是我们还没有编写完整的map函数(mapF)和reduce函数(reduceF)。在你开始阅读MapReduce论文的第二部分时,这里需要说明你的mapF函数和reduceF函数跟论文中有些区别。你的mapF函数会传递文件名和文件的内容;它将会分隔成单词,然后返回含有键值对的切片类型([]KeyValue),你的reduceF函数对于每个键都会调用一次,参数是mapF生成的切片数据,它只有一个返回值。

你可以通过下面的命令运行你的解决方案:

    $ cd "$GOPATH/src/main"
    $ go run wc.go master sequential pg-*.txt
    master: Starting Map/Reduce task wcseq
    Merge: read mrtmp.wcseq-res-0
    Merge: read mrtmp.wcseq-res-1
    Merge: read mrtmp.wcseq-res-2
    master: Map/Reduce task completed
    14.59user 3.78system 0:14.81elapsed

生产的结果会在文件mrtmp.wcseq中,通过下面的命令删除全部的中间数据:

    $ rm mrtmp.*

如果执行下面的命令出现如下内容,那么你的实现是正确的:

    $ sort -n -k2 mrtmp.wcseq | tail -10
    he: 34077
    was: 37044
    that: 37495
    I: 44502
    in: 46092
    a: 60558
    to: 74357
    of: 79727
    and: 93990
    the: 154024

为了让测试更加简单,你可以运行下面的脚本:

    $ sh ./test-wc.sh

脚本会告知你的解决方案是否正确。

Part III: Distributing MapReduce tasks

在开始这部分实验之前要先阅读master.go的代码 目前为止我们都是串行地执行任务,Map/Reduce 最大的优势就是能够自动地并行执行普通的代码,不用开发者进行额外工作。在 Part III 我们会把任务分配给一组 worker thread,在多核上并行进行。虽然我们不在多机上进行,但是会用 RPC 来模拟分布式计算。

我们需要实现 mapreduce/schedule.goschedule(),在一次任务中,Master 会调用两次 schedule(),一次用于 Map,一次用于 Reduce。schedule()将会把任务分配给 Workers,通常任务会比 Workers 数量多,因此 schedule() 会给每个 worker 一个 Task 序列,然后等待所有 Task 完成再返回。

schedule() 通过 registerChan 参数获取 Workers 信息,它会生成一个包含 Worker 的 RPC 地址的 string,有些 Worker 在调用 schedule() 之前就存在了,有的在调用的时候产生,他们都会出现在 registerChan 中。

schedule() 通过发送 Worker.DoTask RPC 调度 Worker 执行任务,可以用 mapreduce/common_rpc.go 中的 call() 函数发送。call() 的第一个参数是 Worker 的地址,可以从 registerChan 获取,第二个参数是 "Worker.DoTask" 字符串,第三个参数是 DoTaskArgs 结构体的指针,最后一个参数为 nil

这是一个比较有含金量的练习。 schedule() 函数的调用发生在 master.go 中。首先,在Distributed() 中对 schedule() 做了一个再封装,使得其只需要一个参数判断是 Map 还是 Reduce。

运行脚本测试是否正确:

./part3.sh

Part IV: Handling worker failures

在该部分中我们需要让 Master 能够处理 failed Worker。MapReduce 使这个处理相对简单,因为如果一个 Worker fails,Master 交给它的任何任务都会失败。此时 Master 需要把任务交给另一个 Worker。

一个 RPC 出错并不一定表示 Worker 没有执行任务,有可能只是 reply 丢失了,或是 Master 的 RPC 超时了。因此,有可能两个 Worker 都完成了同一个任务。同样的任务会生成同样的结果,所以这样并不会引发什么问题。并且,在该 lab 中,每个 Task 都是序列执行的,这就保证了结果的整体性。

运行脚本测试是否正确:

./part4.sh

Part V: Inverted index generation (optional)

Word count是一个典型的Map/Reduce应用,但是它不是一个大规模消费者使用Map/Reduce的例子。它非常简单,现实中很少需要你从一个大的数据集里面统计单词的数量。在这个具有挑战性的练习中,我们代替你已经建立用于生成反向索引的Map和Reduce函数。

反向索引在计算机科学领域被广泛使用,尤其在文档搜索方面。广义地说,一个反向索引就是map,存在底层数据的一些有趣的事实,说明数据的原始位置。例如,在文本搜索的时候,反向索引也许是这样的map,它包含了两个单词从关键字指向文档.

我们将会建立第二个可执行程序在main/ii.go,跟你之前的在part2建立的wc.go非常类似。你应该修改main/ii.go文件中的mapF和reduceF函数,那么它们就可与共同产生反向索引。运行ii.go将会输出一些元组,每一行都类似下面这种格式:

<单词>: <文件个数> <排序后的文件名列表>

相比 PartII 中的词频统计,思想是类似的。 首先在mapF里对文件的内容分词,维护一个哈希表,Key 是单词,Value 是文件名。这样可以去掉重复的词。最后再把哈希表转为输出要求的格式。

mapF返回的数组会经过哈希处理分配到 nReduce 个文件中。doReduce则会将隶属于同样的reduce_task_id的文件的 {key, val} 整合为 {key, []val}。在本例中就相当于 {单词,[]文件名}。 在reduceF里主要是对每个 key 下的所有 value 进行处理。注意,它并不负责输出,只负责返回一个 newVal,最后由doReduce统一以 {key, newVal} 形式输出。因此我们只需要把 []文件名 进行排序后,按要求格式转为一个 string 即可。

运行脚本测试是否正确:

./test-ii.sh

最终测试

测试实验一的五个部分:

./test-mr.sh

总结

MapReduce是处理大数据的重要方法,论文还有很多实验没有覆盖到的地方,还需要多看论文。