您的位置:新葡亰496net > 电脑系统 > 分布式系统,分布式系统学习1

分布式系统,分布式系统学习1

发布时间:2019-11-16 11:27编辑:电脑系统浏览(77)

    前言

    Mit6.824 是自身在求学一些布满式系统方面包车型地铁文化的时候不经常看见的,然后就最早尝试跟课。必须要说,外国的课程难度是真的大,一周的小运还是要学一门 Go 语言,然后还要读随想,进而做MapReduce 实验。
    是因为 M纳瓦拉(MapReduce卡塔 尔(英语:State of Qatar) 框架要求建设构造在 DFS(Distributed File System卡塔 尔(阿拉伯语:قطر‎的根底上贯彻,所以本实验是透过运用多线程来模拟布满式情状。尽管难度上海高校大减弱,不过通过该实验,依然会让我们对 M汉兰达 的着力原理有三个比较深厚的认知。
    做尝试早先我们须要先把精粹的 MapReduce 杂谈给看了,窝相比较提议直接看丹麦语原稿,但风姿罗曼蒂克旦时光不丰盛的话,能够直接在英特网找普通话的翻译版。
    刚带头做那一个实验的时候确实是庸庸碌碌,完全不亮堂怎么入手。后来发掘这几个工程有贰个自动化测量试验文件(test_test.go),每部分实验都会选拔这些测量检验文件里的函数对代码进行测量试验。大家要是本着那个测验函数稳步倒推,然后补全代码就能够。

    介绍

    通过 遍及式系统种类作品,大家询问了布满式的意气风发部分基本概念,尽管写点代码实践一下,这就越来越好了。先做个简易的试验练练手,还记得 MapReduce 吗?,此番试验中会营造一个 MapReduce 库,首要能熟稔 Go 语言外加精晓遍布式系统中的容错机制。首先写个贰个回顾的 MapReduce 程序,再写三个 Master,它不止能分配职务给 worker 何况能处理 worker 实践错误。接口参照他事他说加以考查故事集描述。

    lab1是在单机上完毕mapreduce库,因为从没分布式情形,所以只能促成体系化操作和用并行操作代替分布式操作。

    简介


    在 lab1 中大家将确立八个 MapReduce 库,学习如何运用 Go 创建二个容错的分布式系统。在 Part A 大家必要写三个差相当少的 MapReduce 程序。在 Part B大家需求得以完成多个 Master 为 MapReduce 的 Workers 布置义务,并管理 Workers 现身的失实。其原理能够参见那篇诗歌。

    MIT6.824 2017课程作业的lab1,使用go语言完成mapreduce。框架代码来自 git://g.csail.mit.edu/6.824-golabs-2017,完整兑今世码见https://github.com/shishujuan/mit6.824-2017-mapreduce

    Part I: Map/Reduce input and output

    先是片段是先实现二个顺序版(sequential卡塔 尔(英语:State of Qatar)的M冠道,让大家对 MOdyssey的流水生产线有三个概略的认知,而且实现doMap()doReduce() 多个函数。
    其包罗四个测验函数TestSequentialSingle()TestSequentialMany()

    实行情况

    不会让您从零在此之前撸代码啦,还相当的慢 git clone ?

    $ git clone git://g.csail.mit.edu/6.824-golabs-2016 6.824
    $ cd 6.824
    $ ls
    Makefile src
    

    MapReduce 代码辅助顺序实行和布满式实施。顺序实施代表 Map 先实践,当全体 Map 职责都成功了再实践Reduce,这种形式可能功效相当的低,但是正如便利调节和测量检验,毕竟串行。布满式履行运维了众多 worker 线程,他们并行施行 Map 任务,然后施行 Reduce 任务,这种情势作用更加高,当然更难落实和调节和测量试验。

    先是看一下流程,主函数在src/main/wc.go里,本人提供的map和reduce函数,此番做的要害是wordcount,所以map和reduce函数为:

    序言


    mapreduce 包提供了叁个简约的 Map/Reduce 库,应用程序平日调用 master.go/Distributed() 来开头黄金时代项任务(并行卡塔尔,同样也能调用 master.go/Sequential() 来串行施行,方便debug。

    代码的实践流程为:

    1. 应用提供一花样许多的输入文件,七个 map 函数,三个 reduce 函数,以至reduce 职分的数额(nReduce)。
    2. Master 开启多少个 RPC(Remote Procedure Call) 服务,然后等待 Workers 来注册(使用 master.go/Register())。当有 Task 时,schedule.go/schedule() 决定哪些将这几个 Task 指使给 Workers,甚至哪些管理 Workers 的荒谬。
    3. Master 将各种输入文件都当作 Task,然后调用 common_map.go/doMap()。那几个进度不仅可以够平昔开展(串行形式卡塔尔,也可以经过 RPC 来让 Workers 做。 每一次调用 doMap() 都会读取相应的公文,运行map 函数,将 key/value 结果写入 nReduce 此中间文件之中。在 map 甘休后累积会生成 nMap * nReduce 个文件。命名格式为:
      前缀-map编号-reduce编号
      例如,如果有 2 个 map tasks 以及 3 个 reduce tasks,map 回生成 2*3 = 6 此中等文件。
    mrtmp.xxx-0-0
    mrtmp.xxx-0-1
    mrtmp.xxx-0-2
    mrtmp.xxx-1-0
    mrtmp.xxx-1-1
    mrtmp.xxx-1-2
    

    每一种 Worker 必需能够读取其余 Worker 写入的公文(消息分享卡塔尔国。真正的布满式系统会使用分布式存款和储蓄来兑现分化机器之间的分享,在这地我们将具有的 Workers 运维在 大器晚成台Computer上,并动用当麻芋果件系统。

    1. 此后 Master 会调用 common_reduce.go/doReduce(),与 doMap() 同样,它也能平素完结只怕通过工人成功。doReduce() 将如约 reduce task 编号来集中,生成 nReduce 个结果文件。举个例子地点的事例中根据如下分组举行汇总:
      // reduce task 0
      mrtmp.xxx-0-0
      mrtmp.xxx-1-0
      // reduce task 1
      mrtmp.xxx-0-1
      mrtmp.xxx-1-1
      // reduce task 2
      mrtmp.xxx-0-2
      mrtmp.xxx-1-2
    
    1. 此后 Master 调用 master_splitmerge.go/mr.merge() 将全数变化的文本整合为三个出口。
    2. Master 发送 Shutdown RPC 给全部 Workers,然后关门本人的 RPC 服务。

    1 幼功概念

    map 负担分发。各个map职责平日管理二个文本,有稍许个输入文件就有微微个map职分。而输出则基于reduce的数据分明有稍许个出口。注意,雷同的key肯定是出口到同二个reduce文件中,通过哈希算法来确认三个key应该分发到哪些中间文件。targetReduct = ihash(key) % nReduce。
    总的中间文件数为 nMap * nReduce

    reduce负担对map输出的公文实行拍卖,各样reduce处理自个儿担当的这么些中间文件。负担管理的高中级文件名能够由map个数和reduce number来确认。

    TestSequentialSingle()

    各样map worker管理叁个文书,所以map worker的多少就相当文件的多寡。
    测量检验单个map worker 和 reduce worker。

    func TestSequentialSingle(t *testing.T) {
        mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
        mr.Wait()
        check(t, mr.files)
        checkWorker(t, mr.stats)
        cleanup(mr)
    }
    

    预备:熟识代码

    mapreduce 包提供了叁个简易的 MapReduce 顺序试行落到实处。应用只要调用 Distributed() 方法就足以运转一个任务,可是要调治的时候大概须要调用 Sequential().

    mapreduce 的运作流程如下:

    1. 应用层须要提供输入文件,叁个 map 函数,三个 reduce 函数,要运转reduce 职分的数目。

    2. 用这一个参数创制叁个 master。它会运营二个 RPC 服务器(master_rpc.go),然后等待 worker 注册(Register())。当有待变成的义务时,schedule() 就能够将任务分配给 worker,同一时候也博览会开 worker 的错误管理。

    3. master 感觉每种输入文件应当交付一个 map 职责管理,然后调用 doMap(),无论直接调用 Sequential() 照旧经过 RPC 给 worker 发送 DoTask 新闻都会触发那一个操作。每当调用 doMap() 时,它都会去读取相应的文书,以文件内容调用 map 函数并且为每种输入文件发出 nReduce 个文本。由此,各种 map 任务最终会发出 #files x nReduce 个文件。

    4. master 接下来会对各种 reduce 职务起码调用一回 doReduce()doReduce() 首先会收罗 nReduce 个 map 职分发生的文件,然后在每一个文件上施行 reduce 函数,最后发生一个结实文件。

    5. master 会调用 mr.merge() 方法将上一步发生负有结果文件聚合到贰个文本中。

    故此这一次实验就是到填空题,空是:doMap, doReduce,schedule 和 reduce。

    别的的格局基本无需改造,有时光的钻研切磋推动明白全体架构。

    func mapF(filename string, contents string) []mapreduce.KeyValue {
        // Your code here (Part II).
        f := func(c rune) bool{
            return !unicode.IsLetter(c)
        }
        s := strings.FieldsFunc(contents, f)
        kv := make([]mapreduce.KeyValue, 0)
        for _, k := range s{
            kv = append(kv, mapreduce.KeyValue{k,"1"})
        }
        return kv
    }
    
    func reduceF(key string, values []string) string {
        // Your code here (Part II).
        count := 0
        for _, v := range values {
            vv, _ := strconv.Atoi(v)
            count = count   vv
        }
        return strconv.Itoa(count)
    }
    

    Part I: Map/Reduce input and output


    在大家得以完结 Map/Reduce 在此之前,首先要修复三个串行实现。给出的源码缺少五个基本点部分:

    1. 分割 Map 输出的函数 doMap()
    2. 聚集 Reduce 输入的函数 doReduce()

    率先建议阅读 common.go,这里定义了会用到的数据类型,文件命名方式等。

    2 代码实现

    原始代码有多少个目录:main和mapreduce。在那之中main目录下只必要关爱wc.go,ii.go以及一系列的txt文件。 mapreduce目录则需求改良schedule.gocommon_map.gocommon_reduce.go文件。

    TestSequentialMany()

    此测量检验函数测验多个 map worker 和多少个 reduce worker。
    其运行逻辑和TestSequentialSingle类似。

    func TestSequentialMany(t *testing.T) {
        mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc)
        mr.Wait()
        check(t, mr.files)
        checkWorker(t, mr.stats)
        cleanup(mr)
    }
    

    Part I: Map/Reduce 输入和输出

    率先个空 doMap() 函数的作用是读取钦命文件的剧情,实施 mapF 函数,将结果保存在新的文书中;而 doReuce() 读取 doMap 的输出文件,施行 reduceF 函数,将结果存在磁盘中。

    写完了就测验测量检验,测量检验文件(test_test.go)已经写好了。串行格局测量检验可进行:

    $ cd 6.824
    $ export "GOPATH=$PWD"  
    $ cd "$GOPATH/src/mapreduce"
    $ setup ggo_v1.5
    $ go test -run Sequential mapreduce/...
    ok      mapreduce   2.694s
    

    比如您看看的不是 ok,表达还或许有 bug 哦。在 common.go 将 debugEnbale 设置成 true,然后运营 go test -run Sequential mapreduce/... -v,能够观察更详实的出口:

    $ env "GOPATH=$PWD/../../" go test -v -run Sequential mapreduce/...
    === RUN   TestSequentialSingle
    master: Starting Map/Reduce task test
    Merge: read mrtmp.test-res-0
    master: Map/Reduce task completed
    --- PASS: TestSequentialSingle (1.34s)
    === RUN   TestSequentialMany
    master: Starting Map/Reduce task test
    Merge: read mrtmp.test-res-0
    Merge: read mrtmp.test-res-1
    Merge: read mrtmp.test-res-2
    master: Map/Reduce task completed
    --- PASS: TestSequentialMany (1.33s)
    PASS
    ok      mapreduce   2.672s
    

    下一场有三种测量试验的点子,风姿洒脱种是Sequential,此外种是Distributed,首先去落到实处Sequential的章程吗。那样能够测量检验你有个别效应函数达成对不对。

    doMap() 函数

    率先总括一下 Map 的长河,它对于每种 Map Task,都会实行以下操作:

    1. 从某些数据文件 A.txt 中读取数据。
    2. 自定义函数 mapF 对 A.txt 中的文件举行解读,变成风姿浪漫组 {Key, Val} 对。
    3. 生成 nReduce 个子文件 A_1, A_2, ..., A_nReduce。
    4. 动用 {Key, Val} 中的 Key 值做哈希,将收获的值对 nReduce 取模,以此为凭仗将其分配到子文件之中。
    func doMap(
        jobName string, // the name of the MapReduce job
        mapTaskNumber int, // which map task this is
        inFile string,
        nReduce int, // the number of reduce task that will be run ("R" in the paper)
        mapF func(file string, contents string) []KeyValue,
    ) {
        // 查看参数
        fmt.Printf("Map: job name = %s, input file = %s, map task id = %d, nReduce = %dn",
             jobName, inFile, mapTaskNumber, nReduce);
    
        // 读取输入文件
        bytes, err := ioutil.ReadFile(inFile)
        if (err != nil) {
            // log.Fatal() 打印输出并调用 exit(1)
            log.Fatal("Unable to read file: ", inFile)
        }
    
        // 解析输入文件为 {key,val} 数组
        kv_pairs := mapF(inFile, string(bytes))
    
        // 生成一组 encoder 用来将 {key,val} 保存至对应文件
        encoders := make([]*json.Encoder, nReduce);
        for reduceTaskNumber := 0; reduceTaskNumber < nReduce; reduceTaskNumber   {
            filename := reduceName(jobName, mapTaskNumber, reduceTaskNumber)
            file_ptr, err := os.Create(filename)
            if (err != nil) {
                log.Fatal("Unable to create file: ", filename)
            }
            // defer 后不能用括号
            defer file_ptr.Close()
            encoders[reduceTaskNumber] = json.NewEncoder(file_ptr);
        }
    
        // 利用 encoder 将 {key,val} 写入对应的文件
        for _, key_val := range kv_pairs {
            key := key_val.Key
            reduce_idx := ihash(key) % nReduce
            err := encoders[reduce_idx].Encode(key_val)
            if (err != nil) {
                log.Fatal("Unable to write to file")
            }
        }
    }
    

    Part 1, Part 2

    那多少个实验都比较容易,单机的。Part 1完结common_map.go和common_reduce.go中的通用函数,而Part 2也只是总计单词数目。Part 2这里要在意的是,要动用实验文档中说的撤并方法unicode.IsLetter()去分割单词,不然测量试验会不可能通过。

    Sequential()

    测量试验函数将工作名称,测试文件,reduce 的数量,用户定义的 map 函数,reduce 函数三个实参传递给Sequential()

    // Sequential runs map and reduce tasks sequentially, waiting for each task to
    // complete before running the next.
    func Sequential(jobName string, files []string, nreduce int,
        mapF func(string, string) []KeyValue,
        reduceF func(string, []string) string,
    ) (mr *Master) {
        mr = newMaster("master")
        go mr.run(jobName, files, nreduce, func(phase jobPhase) {
            switch phase {
            case mapPhase:
                for i, f := range mr.files {
                    doMap(mr.jobName, i, f, mr.nReduce, mapF)
                }
            case reducePhase:
                for i := 0; i < mr.nReduce; i   {
                    doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
                }
            }
        }, func() {
            mr.stats = []int{len(files)   nreduce}
        })
        return
    }
    

    Sequential()率先得到三个Master 对象的指针,然后选用函数闭包运维Master.run()

    Part II: 单机词频计算

    造成了第后生可畏都部队分,我们得以初阶构建筑协会调率先个 MapReduce 系统:词频总结器。对的依旧填空题:mapF 和 reduceF,让 wc.go 能够总计出每个单词现身的次数。大家的测验文件之中只有朝鲜语,所以叁个单词正是三回九转现身字母,推断八个字母参考规范库 unicode.IsLetter

    测量试验文件是 6.824/src/main/pg-*.txt,不要紧先编写翻译试试:

    $ cd 6.824
    $ export "GOPATH=$PWD"
    $ cd "$GOPATH/src/main"
    $ go run wc.go master sequential pg-*.txt
    # command-line-arguments
    ./wc.go:14: missing return at end of function
    ./wc.go:21: missing return at end of function
    

    自然通过持续,终究空还未有填呢。mapF 的参数是测验文件名和其情节,分割成单词,重临 []mapreduce.KeyValue,KeyValue:单词-频次。轮到 reduceF 函数了,它会指向每一个 key(单词) 调用三次,参数是某些单词以至那些单词在具备测量检验文件中的 mapF 结果。

    写好了,便可测量检验:

    $ cd "$GOPATH/src/main"
    $ time go run wc.go master sequential pg-*.txt
    master: Starting Map/Reduce task wcseq
    Merge: read mrtmp.wcseq-res-0
    Merge: read mrtmp.wcseq-res-1
    Merge: read mrtmp.wcseq-res-2
    master: Map/Reduce task completed
    14.59user 3.78system 0:14.81elapsed
    

    最后的结果保存在 mrtmp.wcseq 文件中。运维 $ rm mrtmp.* 删除全数的中等数据文件。

    运行 sort -n -k2 mrtmp.wcseq | tail -10,如若看到的和下部的生龙活虎致,表明你写对了。

    $ 
    he: 34077
    was: 37044
    that: 37495
    I: 44502
    in: 46092
    a: 60558
    to: 74357
    of: 79727
    and: 93990
    the: 154024
    

    能够间接运转 $sh ./test-wc.sh

    小提示: strings.FieldFunc 能够将叁个 string 分割成四个部分,strconv 包中有函数可将 string 转换到 int。


    doReduce() 函数
    func doReduce(
        jobName string, // the name of the whole MapReduce job
        reduceTaskNumber int, // which reduce task this is
        outFile string, // write the output here
        nMap int, // the number of map tasks that were run ("M" in the paper)
        reduceF func(key string, values []string) string,
    ) {
        // 查看参数
        fmt.Printf("Reduce: job name = %s, output file = %s, reduce task id = %d, nMap = %dn",
             jobName, outFile, reduceTaskNumber, nMap);
    
        // 建立哈希表,以 slice 形式存储同一 key 的所有 value
        kv_map := make(map[string]([]string))
    
        // 读取同一个 reduce task 下的所有文件,保存至哈希表
        for mapTaskNumber := 0; mapTaskNumber < nMap; mapTaskNumber   {
            filename := reduceName(jobName, mapTaskNumber, reduceTaskNumber)
            f, err := os.Open(filename)
            if (err != nil) {
                log.Fatal("Unable to read from: ", filename)
            }
            defer f.Close()
    
            decoder := json.NewDecoder(f)
            var kv KeyValue
            for ; decoder.More(); {
                err := decoder.Decode(&kv)
                if (err != nil) {
                    log.Fatal("Json decode failed, ", err)
                }
                kv_map[kv.Key] = append(kv_map[kv.Key], kv.Value)
            }
        }
    
        // 对哈希表所有 key 进行升序排序
        keys := make([]string, 0,len(kv_map))
        for k,_ := range kv_map {
            keys = append(keys, k)
        }
        sort.Strings(keys)
    
        // 利用自定义的 reduceF 函数处理同一 key 下的所有 val
        // 按照 key 的顺序将结果以 {key, new_val} 形式输出
        outf, err := os.Create(outFile)
        if (err != nil) {
            log.Fatal("Unable to create file: ", outFile)
        }
        defer outf.Close()
        encoder := json.NewEncoder(outf)
        for _,k := range keys {
            encoder.Encode(KeyValue{k, reduceF(k, kv_map[k])})
        }
    }
    

    最后 go test -run Sequential 通过。

    Part 3

    选拔的布满式实行任务。先是master运转rpc服务,并调用schedule函数推行Task。注意的是,Worker运行后供给调用master的RPC方法Register注册到master,而master发掘成了新的worker,会通报等待channel的协程实行职分操作,同三个worker须求处理三个职务,不过无法同期给一个worker分配四个职责。

    小心在run中,先做map义务,做完map再做reduce。所以在多少个worker做map职责的时候,须要等有着的map任务完毕再持续reduce职责。这里用了 WaitGroup

    除此以外,分配worker的算法要静心,是历次有新的也许会分配到,而老的worker若是执行完了叁次职务,则也要放回channel中以重复使用。

    Master.run()

    // run executes a mapreduce job on the given number of mappers and reducers.
    //
    // First, it divides up the input file among the given number of mappers, and
    // schedules each task on workers as they become available. Each map task bins
    // its output in a number of bins equal to the given number of reduce tasks.
    // Once all the mappers have finished, workers are assigned reduce tasks.
    //
    // When all tasks have been completed, the reducer outputs are merged,
    // statistics are collected, and the master is shut down.
    //
    // Note that this implementation assumes a shared file system.
    func (mr *Master) run(jobName string, files []string, nreduce int,
        schedule func(phase jobPhase),
        finish func(),
    ) {
        mr.jobName = jobName
        mr.files = files
        mr.nReduce = nreduce
    
        fmt.Printf("%s: Starting Map/Reduce task %sn", mr.address, mr.jobName)
    
        schedule(mapPhase)
        schedule(reducePhase)
        finish()
        mr.merge()
    
        fmt.Printf("%s: Map/Reduce task completedn", mr.address)
    
        mr.doneChannel <- true
    }
    

    Part III: 分布式 MapReduce

    MapReduce 让开辟者最爽的地点是不要求关注代码是在多台机械并行实践的。但我们以往的达成是 master 把 map 和 reduce 职务叁个一个实行。即使这种完结格局概念上十分轻便,可是质量并非超高。接下来大家来促成二个涌出的 MapReduce,它会调用七个 worker 线程去试行职分,这样可以更加好地动用多核CPU。当然我们的实行不是真署在多台机器上而是用 channel 去模拟布满式总结。

    由于是出新,所以供给调整者 master 线程,它担负给 worker 分发职责,并且一向等候直到全体 worker 完毕职务。为了让大家的尝试越发一步一个脚踏过的痕迹,master 只好通过 RPC 的不二秘技与 worker 通信。worker 代码(mapreduce/worker.go)已经筹划好了,它用于运行 worker。

    下多个空是 schedule.go 中的 schedule(),那么些办法担当给 worker 分发 map 和 reduce 任务,当有着义务到位后回来。

    master.go 中的 run() 方法会先调用 schedule(),然后调用 merge() 把各样 reduce 职务的出口文件整合到七个文书之中。schedule 只要求报告 worker 输入文件的名字 (mr.files[task]) 和任务 task,worker 本身领悟从何地读取也晓得把结果写到哪个文件之中。master 通过 RPC 调用 Worker.DoTask 通告 worker 开首新职分,同有时候还有恐怕会在 RPC 参数中包蕴八个 DoTaskArgs 对象。

    当一个 worker 酌量完成能够干活时,它会向 master 发送贰个 Register RPC,注册的还要还恐怕会把那些 worker 的连带音讯归入 mr.registerChannel。所以 schedule 应该经过读取那几个 channel 管理新 worker 的登记。

    一时正在运作的 job 音信都在 Master 中定义。注意,master 无需理解 Map 或 Reduce 具体施行的是怎样代码;当三个 worker 被 wc.go 成立时就曾经指引了 Map 和 Reduce 函数的消息。

    运行 $ go test -run TestBasic mapreduce/... 可进展底工测验。

    小提醒: master 应该相互的殡葬 RPC 给 worker,那样 worker 能够并发实施职分。可参照 Go RPC 文书档案。

    小指示: master 应该等一个 worker 完成当前职分后马上为它分配二个新职分。等待 master 响应的线程能够用 channel 作为联合工具。Concurrency in Go 有详尽的 channel 用法。

    小提醒: 追踪 bug 最简便易行的议程正是在代码加入 debug(),然后实践 go test -run TestBasic mapreduce/... > out,out 就能够含有调节和测验音讯。最关键的思维你原认为的输出和实在的输出为啥不相似。

    注:当前的代码试运作在一个 Unix 过程中,何况它亦可采纳后生可畏台机器的多核。假使是要安插在多台机器上,则要校订代码让 worker 通过 TCP 并不是 Unix-domain sockets 通信。其它还亟需三个网络文件系统分享存款和储蓄。

    Sequential

    func Sequential(jobName string, files []string, nreduce int,
        mapF func(string, string) []KeyValue,
        reduceF func(string, []string) string,
    ) (mr *Master) {
        mr = newMaster("master")
        go mr.run(jobName, files, nreduce, func(phase jobPhase) {
            switch phase {
            case mapPhase:
                for i, f := range mr.files {
                    doMap(mr.jobName, i, f, mr.nReduce, mapF)
                }
            case reducePhase:
                for i := 0; i < mr.nReduce; i   {
                    doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
                }
            }
        }, func() {
            mr.stats = []int{len(files)   nreduce}
        })
        return
    }
    

    main函数调用Sequential落成,在src/mapreduce/master.go:60,传的参数值有jobName,files是输入文件,nreduce是reduce的输入文件个数,map和reduce的函数。

    首先创制叁个master,然后开贰个线程运营run函数,run的第1个参数是二个函数,定义为当phase是mapPhase时调用doMap,是reducePhase时调用doReduce,第四个参数是finish函数。

    func (mr *Master) run(jobName string, files []string, nreduce int,
        schedule func(phase jobPhase),
        finish func(),
    ) {
        mr.jobName = jobName
        mr.files = files
        mr.nReduce = nreduce
    
        fmt.Printf("%s: Starting Map/Reduce task %sn", mr.address, mr.jobName)
    
        schedule(mapPhase)
        schedule(reducePhase)
        finish()
        mr.merge()
    
        fmt.Printf("%s: Map/Reduce task completedn", mr.address)
    
        mr.doneChannel <- true
    }
    

    可以看来run函数定义了西子行mapPhase,再实践reducePhase,schedule函数为Sequential里定义的串行试行。最后merge reduce task的出口文件。mr.doneChannel中输入true值(main函数里调用的mr.Wait()供给mr.doneChannel中有值,不然会窒碍等待卡塔尔

    下一场大家去看一下doMap和doReduce的得以达成。

    func doMap(
        jobName string, // the name of the MapReduce job
        mapTask int, // which map task this is
        inFile string,
        nReduce int, // the number of reduce task that will be run ("R" in the paper)
        mapF func(filename string, contents string) []KeyValue,
    ) {
        data, _ := ioutil.ReadFile(inFile)
        mapkv := mapF(inFile, string(data))
        f := make([]*os.File, nReduce)
        for i := 0; i < nReduce; i  {
            filename := reduceName(jobName, mapTask, i)
            f[i], _ = os.OpenFile(filename, os.O_RDONLY|os.O_WRONLY|os.O_CREATE, 0666)
            defer f[i].Close()
        }
        for _, kv := range mapkv{
            r := ihash(kv.Key) % nReduce
            enc := json.NewEncoder(f[r])
            enc.Encode(&kv)
        }
    }
    

    doMap的输入是二个文书,输出应该是nReduce此中等文件,先试行mapF,得到一个kv结构的数组。之后根本是内需调用读写文件的api,创制nReduce此中等文件,何况打开它们的描述符,用defer最后关闭。然后把kv数组的各类key做hash之后用json编码后存入对应文件呈报符。

    type ByKey []KeyValue
    func(a ByKey) Len() int {return len(a)}
    func(a ByKey) Swap(i, j int) {a[i], a[j] = a[j], a[i]}
    func(a ByKey) Less(i, j int) bool {return a[i].Key < a[j].Key}
    func doReduce(
        jobName string, // the name of the whole MapReduce job
        reduceTask int, // which reduce task this is
        outFile string, // write the output here
        nMap int, // the number of map tasks that were run ("M" in the paper)
        reduceF func(key string, values []string) string,
    ) {
        kvslice := make([]KeyValue, 0)
        for i := 0; i < nMap; i  {
            filename := reduceName(jobName, i, reduceTask)
            f, _ := os.OpenFile(filename, os.O_RDONLY, 0666)
            defer f.Close()
            dec := json.NewDecoder(f)
            var kv KeyValue
            for{ 
                err := dec.Decode(&kv)
                if err != nil {
                    break
                }else{
                    kvslice = append(kvslice, KeyValue(kv))
                }
            }
        }
        sort.Sort(ByKey(kvslice))
        lenkv := len(kvslice)
        value := make([]string, 0)
        ff, _ := os.OpenFile(outFile, os.O_CREATE|os.O_RDONLY|os.O_WRONLY,0666)
        defer ff.Close()
        enc := json.NewEncoder(ff)
        for i := 0; i < lenkv; i  {
            if i != 0 && kvslice[i].Key != kvslice[i-1].Key{
                s := reduceF(kvslice[i-1].Key, value)
                enc.Encode(&KeyValue{kvslice[i-1].Key, s})
                value = make([]string, 0)
            }
            value = append(value, kvslice[i].Value)
        }
        if len(value) != 0{
            s := reduceF(kvslice[lenkv-1].Key, value)
            enc.Encode(KeyValue{kvslice[lenkv-1].Key, s})
        }
    }
    

    doReduce的输入相应是对应每一种reduce职责,分别从nMap个map任务中读取对应的中间文件来实行reduceF。所以对应各样reduce task,首先须要得到nMap此中间文件的name,然后读收取来,用json格式解码,把各样kv结构解码出来存入kvslice中,然后调用结构体排序,结构体排序要求定义多少个准则,如上Len,Swap,Less,就能够,然后将排好序的kvslice,对于每多少个key值的list,输入到reduceF中总计拿到三个值,然后写入尾声输出文件。


    Part II: Single-worker word count


    今昔大家将促成二个简练的 Map/Reduce 案例:词频总括。在 main/wc.go 中有亟待落到实处的 mapF() 以及 reduceF() 方法。我们供给做到总结输入文件中各类单词现身频率的功用。
    我们要求将文件名和文书的剧情作为参数字传送递给 mapF(),它会把内容分为叁个个单词,并回到 {Key, Value} 的 slice。当中,Key 便是单词。reduceF() 会管理一个 Key 下的持有 Value,然后回来单词现身总的数量。

    func mapF(filename string, contents string) []mapreduce.KeyValue {
        // TODO: you have to write this function
        // words := strings.Fields(contents)
        f := func(c rune) bool {
            return !unicode.IsLetter(c)
        }
        words := strings.FieldsFunc(contents, f)
        kv_slice := make([]mapreduce.KeyValue, 0, len(words))
        for _,word := range words {
            kv_slice = append(kv_slice, mapreduce.KeyValue{word, "1"})
        }
        return kv_slice
    }
    
    func reduceF(key string, values []string) string {
        // TODO: you also have to write this function
        var sum int
        for _,str := range values {
            i, err := strconv.Atoi(str)
            if (err != nil) {
                log.Fatal("Unable to convert ", str, " to int")
            }
            sum  = i
        }
        return strconv.Itoa(sum)
    }
    

    贯彻较为简单,可是作者犯了三个最少错误,即用strings.Fields() 来分割字符串。实际上,借使一个句子是:

    "Where is he? Anyone knows?"

    生机勃勃旦只用空格分开,本来是 "he" 就改为了 "he?",引致总结出错。

    Part 4

    worker战败的情形下的试验,也比较轻松,在各种职责推行时加个for循环,即使成功则脱离,否则重新取worker实践职务。

    doMap()

    doMap()doReduce()是索要大家去贯彻的函数。
    doMap()的贯彻入眼是将客商定义的MapFunc()切割的文本,通过 hash 分到 'nReduce'个切条中去。

    func doMap(
        jobName string, // the name of the MapReduce job
        mapTaskNumber int, // which map task this is
        inFile string,
        nReduce int, // the number of reduce task that will be run ("R" in the paper)
        mapF func(file string, contents string) []KeyValue,
    ) {
        // read contents from 'infile'
        dat,err := ioutil.ReadFile(inFile)
        if err != nil {
            log.Fatal("doMap: readFile ", err)
        }
    
        //transfer data into ‘kvSlice’ according to the mapF()
        kvSlice := mapF(inFile, string(dat))
    
        //divide the ‘kvSlice’ into 'reduceKv' according to the ihash()
        var reduceKv [][]KeyValue // temporary variable which will be written into reduce files
        for i:=0;i<nReduce;i   {
            s1 := make([]KeyValue,0)
            reduceKv = append(reduceKv, s1)
        }
        for _,kv := range kvSlice{
            hash := ihash(kv.Key) % nReduce
            reduceKv[hash] = append(reduceKv[hash],kv)
        }
    
        //write 'reduceKv' into ‘nReduce’ JSON files
        for i := 0;i<nReduce;i   {
            file,err := os.Create(reduceName(jobName,mapTaskNumber,i))
            if err != nil {
                log.Fatal("doMap: create ", err)
            }
    
            enc := json.NewEncoder(file)
            for _, kv := range reduceKv[i]{
                err := enc.Encode(&kv)
                if err != nil {
                    log.Fatal("doMap: json encodem ", err)
                }
            }
    
            file.Close()
    
        }
    }
    

    Part IV: 管理 worker 实施错误

    本小节要让您的 master 能够处理职务施行倒闭的 worker。由于 MapReduce 中 worker 并未悠久状态,所以拍卖起来相对轻易。倘若三个 worker 试行破产了,master 向 worker 发送的其余四个 RPC 都或然停业,比如超时。因而,要是战败,master 应该把那个任务支使给另为三个worker。

    二个 RPC 退步并不一定代表 worker 战败,有望是有些 worker 不荒谬运转但 master 不或然得到到它的音讯。所以可能会出四个 worker 同时实施同叁个任务。可是因为各样职分都以幂等的,三个职责被试行一遍是没啥影响。

    大家要是它不会倒闭,所以没有必要管理 master 失利的状态。让 master 能够容错是相持费力的,因为它保持着持有始有终的情景,当它退步后大家需求还原它之处以管教它能够三番三遍专门的学问。

    test_test.go 还剩最后七个测量试验。测有叁个 worker 失利的情况和有那多少个worker 败北的意况。运营可测量检验:$ go test -run Failure mapreduce/...

    Distributed

    Sequential实施非常轻松,上面就是贯彻Distributed了,布满式的略为复杂性一些,首假如贯彻schedule那一个函数,让我们来看一下流水生产线吧。

    func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
        mr = newMaster(master)
        mr.startRPCServer()
        go mr.run(jobName, files, nreduce,
            func(phase jobPhase) {
                ch := make(chan string)
                go mr.forwardRegistrations(ch)
                schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
            },
            func() {
                mr.stats = mr.killWorkers()
                mr.stopRPCServer()
            })
        return
    }
    

    先是调用startRPCServer()开启rpc服务器

    func (mr *Master) startRPCServer() {
        rpcs := rpc.NewServer()
        rpcs.Register(mr)
        os.Remove(mr.address) // only needed for "unix"
        l, e := net.Listen("unix", mr.address)
        if e != nil {
            log.Fatal("RegstrationServer", mr.address, " error: ", e)
        }
        mr.l = l
    
        // now that we are listening on the master address, can fork off
        // accepting connections to another thread.
        go func() {
        loop:
            for {
                select {
                case <-mr.shutdown:
                    break loop
                default:
                }
                conn, err := mr.l.Accept()
                if err == nil {
                    go func() {
                        rpcs.ServeConn(conn)
                        conn.Close()
                    }()
                } else {
                    debug("RegistrationServer: accept error", err)
                    break
                }
            }
            debug("RegistrationServer: donen")
        }()
    }
    

    挂号二个rpc服务器,然后监听三个地方,因为这么些是在单机上落到实处的竞相,所以类型是unix,地址是本地地址。然后开一个线程不停的巡回监听,收到必要就ServeConn(conn)创立连接。那样地点就有个线程一向在监听伏乞。

    进而Distributed也开八个线程运转run函数,此次的第两个参数schedule创制了三个channel,然后打开了多少个线程去执forwardRegistrations

    func (mr *Master) forwardRegistrations(ch chan string) {
        i := 0
        for {
            mr.Lock()
            if len(mr.workers) > i {
                // there's a worker that we haven't told schedule() about.
                w := mr.workers[i]
                go func() { ch <- w }() // send without holding the lock.
                i = i   1
            } else {
                // wait for Register() to add an entry to workers[]
                // in response to an RPC from a new worker.
                mr.newCond.Wait()
            }
            mr.Unlock()
        }
    }
    

    本条的主要功能是当有worker新参预列表,就把它加到刚才创建的channel里,能够看做是worker队列。
    在意那行go func() { ch <- w }(),因为ch是无缓冲channel,里面只可以放进去就抽出来,不能够放三个值,不然会死锁,所以要额外开二个线程,避防父线程被打断。

    schedule分别在mapPhase和reducePhase的时候调用大家温馨完毕的schedule函数。

    func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
        var ntasks int
        var n_other int // number of inputs (for reduce) or outputs (for map)
        switch phase {
        case mapPhase:
            ntasks = len(mapFiles)
            n_other = nReduce
        case reducePhase:
            ntasks = nReduce
            n_other = len(mapFiles)
        }
    
        fmt.Printf("Schedule: %v %v tasks (%d I/Os)n", ntasks, phase, n_other)
    
        // All ntasks tasks have to be scheduled on workers. Once all tasks
        // have completed successfully, schedule() should return.
        //
        // Your code here (Part III, Part IV).
        //
        // var done sync.WaitGroup
        // for count := 0; count < ntasks; count  {
        //  var w, filename string
        //  w = <- registerChan
        //  if phase == mapPhase{
        //      filename = mapFiles[count]
        //  }else{
        //      filename = ""
        //  }
        //  done.Add(1)
        //  go func(count int){
        //      defer done.Done()
        //      call(w, "Worker.DoTask", &DoTaskArgs{jobName, filename, phase, count, n_other}, new(struct{}))
        //      println("Start ...")
        //      registerChan <- w
        //      println("Stop ...")
        //  }(count)
        // }    
        // done.Wait()
    
        var done sync.WaitGroup
        done.Add(ntasks)
        ch_task := make(chan int, ntasks)
        for i := 0; i < ntasks; i  {
            ch_task <- i
        }
        go func(){
            for {
                w := <- registerChan
                go func(w string){
                    for{
                        task_id := <- ch_task
                        var filename string
                        if phase == mapPhase{
                            filename = mapFiles[task_id]
                        }else{
                            filename = ""
                        }
                        ok := call(w, "Worker.DoTask", &DoTaskArgs{jobName, filename, phase, task_id, n_other}, new(struct{}))
                        if ok == false{
                            ch_task <- task_id
                        }else{
                            done.Done()
                        }
                    }
                }(w)
            }
        }()
        done.Wait()
        fmt.Printf("Schedule: %v donen", phase)
    }
    

    有三种达成形式,上边包车型地铁可比好,注释掉的有错误。
    下边这种达成格局用WaitGroup,首先把WaitGroup的值设为ntasks,然后创制多个有缓冲的职责channel,把装有职务往channel里塞进去,然后开一个线程,Infiniti循环,当worker channel里有空暇的worker,就开二个线程给它,让它最佳循环,只要任务channel里有没施行的职分,就收取来实施,master通过rpc调用worker w的DoTask函数,因为此地是在地面运转的,所在此早前面mr已经张开过rpc服务器了,所以worker没有供给敞开rpc服务器了。借使有个别职分失利了,就把职务一连塞进职责channel里,达成化解worker failure

    上边说注释掉的写法不佳是因为,循环职责,然后把可用的worker给抽取来进行操作,registerChan <- w要是有多个w放进去,就能拥塞,诱致Wait()相当小概做到,一贯不通。尽管能够go func(){registerChan <- w},可是总感到不太好,M昂Cora算法应该是各样worker开个线程平素专门的职业,有职分来了就做,比较相符遍及式的MKuga算法。

    下一场和Sequential同样进行完map和reduce任务之后,调用finish,分布式的finish需求先killWorkers

    func (mr *Master) killWorkers() []int {
        mr.Lock()
        defer mr.Unlock()
        ntasks := make([]int, 0, len(mr.workers))
        for _, w := range mr.workers {
            debug("Master: shutdown worker %sn", w)
            var reply ShutdownReply
            ok := call(w, "Worker.Shutdown", new(struct{}), &reply)
            if ok == false {
                fmt.Printf("Master: RPC %s shutdown errorn", w)
            } else {
                ntasks = append(ntasks, reply.Ntasks)
            }
        }
        return ntasks
    }
    

    用rpc调用worker.shutdown,然后mr关闭RPC服务器。


    首先次用go,恐怕写的相比挫以至标准词汇使用不当,仍需努力。

    Part III: Distributing MapReduce tasks


    最近截至大家都以串行地实践任务,Map/Reduce 最大的优势正是可以活动地并行实施普通的代码,不用开荒者举办额外专门的学业。在 Part III 大家会把任务分配给一组 worker thread,在多核上并行进行。即使大家不在多机上实行,不过会用 RPC 来效仿遍布式计算。

    大家要求完成 mapreduce/schedule.goschedule(),在二遍职务中,Master 会调用五遍 schedule(),一遍用于 Map,贰遍用于 Reduce。schedule()将会把职责分配给 Workers,平时职务会比 Workers 数量多,因此 schedule() 会给各个 worker 贰个 Task 种类,然后等待全数 Task 实现再回到。

    schedule() 通过 registerChan 参数获取 Workers 消息,它会调换三个满含Worker 的 RPC 地址的 string,有个别 Worker 在调用 schedule() 在此之前就存在了,有的在调用的时候发出,他们都会产出在 registerChan 中。

    schedule() 通过发送 Worker.DoTask RPC 调节 Worker 实施职责,能够用 mapreduce/common_rpc.go 中的 call() 函数发送。call() 的首先个参数是 Worker 的地点,能够从 registerChan 获取,首个参数是 "Worker.DoTask" 字符串,第八个参数是 DoTaskArgs 结构体的指针,最终多个参数为 nil

    那是贰个相比有含金量的演习。
    schedule() 函数的调用发生在 master.go 中。首先,在Distributed() 中对 schedule() 做了一个再封装,使得其只供给一个参数剖断是 Map 依旧Reduce。

    func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
        mr = newMaster(master)
        mr.startRPCServer()
        go mr.run(jobName, files, nreduce,
            func(phase jobPhase) {
                ch := make(chan string)
                go mr.forwardRegistrations(ch)
                schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
            },
            func() {
                mr.stats = mr.killWorkers()
                mr.stopRPCServer()
            })
        return
    }
    

    此地就是实际上使用 schedule() 的地点。二回用于 Map,二次用于 Reduce。

    func (mr *Master) run(jobName string, files []string, nreduce int,
        schedule func(phase jobPhase),
        finish func(),
    ) {
        mr.jobName = jobName
        mr.files = files
        mr.nReduce = nreduce
    
        fmt.Printf("%s: Starting Map/Reduce task %sn", mr.address, mr.jobName)
    
        schedule(mapPhase)
        schedule(reducePhase)
        finish()
        mr.merge()
    
        fmt.Printf("%s: Map/Reduce task completedn", mr.address)
    
        mr.doneChannel <- true
    }
    

    打听完调用后,还索要重视精晓的是RPC 机制。搞懂 call() 的参数格局。
    先上三个错误的写法,该写法招致文件生成不全,程序不可能顺风实现:

    func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
        var ntasks int
        var n_other int // number of inputs (for reduce) or outputs (for map)
        switch phase {
        case mapPhase:
            ntasks = len(mapFiles)
            n_other = nReduce
        case reducePhase:
            ntasks = nReduce
            n_other = len(mapFiles)
        }
    
        fmt.Printf("Schedule: %v %v tasks (%d I/Os)n", ntasks, phase, n_other)
    
        // Part III code
        // 错误,多个 gotoutine 共用一个 DoTaskArgs,在多线程中有问题
        var taskArgs DoTaskArgs
        taskArgs.JobName = jobName
        taskArgs.Phase = phase
        taskArgs.NumOtherPhase = n_other
        var wait_group sync.WaitGroup;
        for i := 0; i < ntasks; i   {
            wait_group.Add(1)
            // 错误,i   在主线程执行,goroutine 里的 i 值受到影响
            go func() {
                fmt.Printf("Now: %dth taskn", i)
                defer wait_group.Done()
                taskArgs.TaskNumber = i
                if (phase == mapPhase) {
                    taskArgs.File = mapFiles[i]
                }
                worker := <-registerChan
                if (call(worker, "Worker.DoTask", &taskArgs, nil) != true) {
                    log.Fatal("RPC call error, exit")
                }
                // 错误,导致死锁
                registerChan <- worker
            }()
        }
        wait_group.Wait()
        fmt.Printf("Schedule: %v phase donen", phase)
    }
    

    综上说述,那是对十二线程编制程序精晓相当不够浓重招致的。脑子里必要随即有根弦,凡是要改过的数据,都不要分享,否则就记得加锁。别的,channel 操作时,须求小心死锁。

    以下是金科玉律的写法。

    func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
        var ntasks int
        var n_other int // number of inputs (for reduce) or outputs (for map)
        switch phase {
        case mapPhase:
            ntasks = len(mapFiles)
            n_other = nReduce
        case reducePhase:
            ntasks = nReduce
            n_other = len(mapFiles)
        }
    
        fmt.Printf("Schedule: %v %v tasks (%d I/Os)n", ntasks, phase, n_other)
    
        // All ntasks tasks have to be scheduled on workers, and only once all of
        // them have been completed successfully should the function return.
        // Remember that workers may fail, and that any given worker may finish
        // multiple tasks.
        //
        // TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
        //
    
        // Part III code
        var wait_group sync.WaitGroup;
    
        for i := 0; i < ntasks; i   {
            wait_group.Add(1)
            // 每个 task 独自拥有一个 DoTaskArgs
            var taskArgs DoTaskArgs
            taskArgs.JobName = jobName
            taskArgs.Phase = phase
            taskArgs.NumOtherPhase = n_other
            taskArgs.TaskNumber = i
            if (phase == mapPhase) {
                taskArgs.File = mapFiles[i]
            }
            go func() {
                // fmt.Printf("Now: %dth taskn", task_id)
                defer wait_group.Done()
                worker := <-registerChan
                if (call(worker, "Worker.DoTask", &taskArgs, nil) != true) {
                    log.Fatal("RPC call error, exit")
                }
                // 非常关键,完成后再将 worker 放回
                go func() {registerChan <- worker}()
            }()
        }
        wait_group.Wait()
        fmt.Printf("Schedule: %v phase donen", phase)
    }
    

    最后 go test -run TestBasic 通过。

    Part 5

    Part 5须求产生三个倒排索引,实现ii.go就可以,跟wc.go肖似,主倘使要去掉重复以致对倒排列表排序就能够通过测量试验了。

    doReduce()

    doReduce()重大是将 key 值相仿的 value 打包发送给顾客定义的 ReduceFunc(),得到三个新的 kv对,key 值不改变,而value值则是ReduceFunc()的再次回到值,排序,最后将新的 kv对 切成块写入文件。

    type ByKey []KeyValue
    func (a ByKey) Len() int { return len(a) }
    func (a ByKey) Swap(i, j int) { a[i],a[j] = a[j],a[i] }
    func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
    
    func doReduce(
        jobName string, // the name of the whole MapReduce job
        reduceTaskNumber int, // which reduce task this is
        outFile string, // write the output here
        nMap int, // the number of map tasks that were run ("M" in the paper)
        reduceF func(key string, values []string) string,
    ) {
        //read kv slice from the json file
        var kvSlice []KeyValue
        for i := 0;i<nMap;i  {
            //file, _ := os.OpenFile(reduceName(jobName,i,reduceTaskNumber), os.O_RDONLY, 0666)
            file,err := os.Open(reduceName(jobName,i,reduceTaskNumber))
            if err != nil {
                log.Fatal("doReduce: open ", err)
            }
            var kv KeyValue
            dec := json.NewDecoder(file)
            for{
                err := dec.Decode(&kv)
                kvSlice = append(kvSlice,kv)
                if err == io.EOF {
                    break
                }
            }
            file.Close()
            /********/
            //此处如果用 defer,可能会造成文件开启过多,造成程序崩溃
            /********/
        }
    
        //sort the intermediate kv slices by key
        sort.Sort(ByKey(kvSlice))
    
        //process kv slices in the reduceF()
        var reduceFValue []string
        var outputKv []KeyValue
        var preKey string = kvSlice[0].Key
        for i,kv := range kvSlice{
            if i == (len(kvSlice) - 1) {
                reduceFValue = append(reduceFValue, kv.Value)
                outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
            } else {
                    if kv.Key != preKey {
                        outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
                        reduceFValue = make([]string, 0)
                    }
                    reduceFValue = append(reduceFValue, kv.Value)
            }
    
            preKey = kv.Key
        }
    
        //write the reduce output as JSON encoded kv objects to the file named outFile
        file,err := os.Create(outFile)
        if err != nil {
            log.Fatal("doRuduce: create ", err)
        }
        defer file.Close()
    
        enc := json.NewEncoder(file)
        for _, kv := range outputKv{
            err := enc.Encode(&kv)
            if err != nil {
                log.Fatal("doRuduce: json encode ", err)
            }
        }
    }
    

    分布式系统,分布式系统学习1。Part V: 反向索引(可选卡塔尔

    挑战性:

    词频计算纵然是 MapReduce 最精粹的三个运用,不过在科学普及数据应用不平日用。试试写个反向索引应用。

    反向索引在微管理机科学中接受大范围,极度在文档寻找世界中特别实用。平日的话,二个反向索引就是四个从数额到多少特征的映射。比方,在文书档案找寻中,这些映射可能就是第风流倜傥词与文档名称的照射。

    main/ii.go 的完整布局跟 wc.go 雷同。改正 mapF 和 reduceF 让它们创设反向索引。运营 ii.go 应该出口三个元组列表,每豆蔻年华行的格式如下:

    $ go run ii.go master sequential pg-*.txt
    $ head -n5 mrtmp.iiseq
    A: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    ABC: 2 pg-les_miserables.txt,pg-war_and_peace.txt
    ABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txt
    ABRAHAM: 1 pg-dracula.txt
    ABSOLUTE: 1 pg-les_miserables.txt
    

    你的代码应该经过 test-ii.sh 的测量检验:

    $ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10
    women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    won: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    wonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    words: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    worked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    worse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    wounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    yes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    younger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    yours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
    

    Part IV: Handling worker failures


    在该部分中我们须求让 Master 能够管理 failed Worker。MapReduce 使那么些管理相对简便易行,因为即使八个 Worker fails,Master 交给它的其余职责都会战败。这时 Master 需求把职分交给另二个 Worker。

    三个 RPC 出错并不一定表示 Worker 未有实行义务,有望只是 reply 错过了,或是 Master 的 RPC 超时了。由此,有望多少个 Worker 都成功了同二个职分。相仿的职务会转换同样的结果,所以这么并不会引发什么难点。并且,在该 lab 中,每种 Task 都以系列推行的,那就有限扶植了结果的全体性。

    贯彻较简单,将 Part III 中的 goroutine 做大幅度矫正就可以。参预有线循环使得在 call 再次来到 false 的时候另选三个worker 重试,重回 true 的时候将 worker 放回 ch,跳出循环。

    ...
            go func() {
                // fmt.Printf("Now: %dth taskn", task_id)
                defer wait_group.Done()
                // 加入无限循环,只要任务没完成,就换个 worker 执行
                for {
                    worker := <-registerChan
                    if (call(worker, "Worker.DoTask", &taskArgs, nil) == true) {
                        // 非常关键,完成后再将 worker 放回
                        go func() {registerChan <- worker}()
                        break
                    }
                }
            }()
    ...
    

    修改后 go test -run Failure 成功。

    Part II: Single-worker word count

    其次片段是促成mapF()reduceF()函数,来兑现通过逐生机勃勃M奇骏总结词频的职能。
    较易,就直接放代码了。

    func mapF(filename string, contents string) []mapreduce.KeyValue {
        f := func(c rune) bool {
            return !unicode.IsLetter(c)
        }
        var strSlice []string = strings.FieldsFunc(contents,f)
        var kvSlice []mapreduce.KeyValue
        for _,str := range strSlice {
            kvSlice = append(kvSlice, mapreduce.KeyValue{str, "1"})
        }
    
        return kvSlice
    }
    
    func reduceF(key string, values []string) string {
        var cnt int64
        for _,str := range values{
            temp,err := strconv.ParseInt(str,10,64)
            if(err != nil){
                fmt.Println("wc :parseint ",err)
            }
            cnt  = temp
        }
        return strconv.FormatInt(cnt,10)
    }
    

    经过全方位测验

    运转 src/main/test-mr.sh 可测量试验此次试验的有所内容。借使全部经过,能够看来:

    $ sh ./test-mr.sh
    ==> Part I
    ok      mapreduce   3.053s
    
    ==> Part II
    Passed test
    
    ==> Part III
    ok      mapreduce   1.851s
    
    ==> Part IV
    ok      mapreduce   10.650s
    
    ==> Part V (challenge)
    Passed test
    

    Part V: Inverted index generation (optional)


    该部分的渴求是,总括出具备饱含某些词的文本。并以下列情势出口:

    <单词>: <文件个数> <排序后的文件名列表>
    

    比较之下 Part II 中的词频计算,观念是相近的。
    率先在 mapF 里对文本的剧情分词,维护八个哈希表,Key 是单词,Value 是文本名。这样能够去掉重复的词。最终再把哈希表转为输出需要的格式。

    func mapF(document string, value string) (res []mapreduce.KeyValue) {
        // TODO: you should complete this to do the inverted index challenge
        f := func(c rune) bool {
            return !unicode.IsLetter(c)
        }
        words := strings.FieldsFunc(value, f)
        // 区别1. 采取 HashMap 去掉重复单词
        map_doc := make(map[string]string, 0)
        for _,word := range words {
            map_doc[word] = document
        }
        res = make([]mapreduce.KeyValue, 0, len(map_doc))
        for k,v := range map_doc {
            res = append(res, mapreduce.KeyValue{k, v})
        }
        return
    }
    

    分布式系统,分布式系统学习1。mapF 再次来到的数组会经过哈希处理分配到 nReduce 个公文中。doReduce 则会将配归于同生龙活虎的 reduce_task_id 的文书的 {key, val} 整合为 {key, []val}。在本例中就一定于 {单词,[]文件名}。
    在 reduceF 里主假设对各类 key 下的具有 value 实行拍卖。注意,它并不肩负输出,只担负再次回到三个 newVal,最终由 doReduce 统一以 {key, newVal} 格局出口。因而大家只需求把 []文件名 实行排序后,按供给格式转为一个 string 就能够。

    func reduceF(key string, values []string) string {
        // TODO: you should complete this to do the inverted index challenge
        nDoc := len(values)
        sort.Strings(values)
        var buf bytes.Buffer;
        buf.WriteString(strconv.Itoa(nDoc))
        buf.WriteRune(' ')
        for i,doc := range values {
            buf.WriteString(doc)
            if (i != nDoc-1) {
                buf.WriteRune(',')
            }
        }
        return buf.String()
    }
    

    最后 ./test-ii.sh 测验通过。

    Part III: Distributing MapReduce tasks && Part IV: Handling worker failures

    其三某个和第四某些能够协同来做,首借使变成schedule(),完毕二个透过线程并发执行map worker 和 reduce worker 的 MRubicon 框架。框架通过 RPC 来效仿布满式总结,并要带有 worker 的容灾成效。

    TestBasic()

    测量检验函数运行多个线程运转RUnWoker()

    func TestBasic(t *testing.T) {
        mr := setup()
        for i := 0; i < 2; i   {
            go RunWorker(mr.address, port("worker" strconv.Itoa(i)),
                MapFunc, ReduceFunc, -1)
        }
        mr.Wait()
        check(t, mr.files)
        checkWorker(t, mr.stats)
        cleanup(mr)
    }
    

    setup() && Distributed()

    func setup() *Master {
        files := makeInputs(nMap)
        master := port("master")
        mr := Distributed("test", files, nReduce, master)
        return mr
    }
    

    通过mr.startRPCServer() 运营 master 的 RPC 服务器,然后通过 mr.run()进行 worker 的调度。

    // Distributed schedules map and reduce tasks on workers that register with the
    // master over RPC.
    func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
        mr = newMaster(master)
        mr.startRPCServer()
        go mr.run(jobName, files, nreduce,
            func(phase jobPhase) {
                ch := make(chan string)
                go mr.forwardRegistrations(ch)
                schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
            },
            func() {
                mr.stats = mr.killWorkers()
                mr.stopRPCServer()
            })
        return
    }
    

    Master.forwardRegistrations()

    该函数通过worker 的数码来判别是还是不是有新 worker 运转,生龙活虎旦开采成新的 worker 运行,则运用管道(ch卡塔 尔(阿拉伯语:قطر‎文告schedule()
    掌握该函数对促成前边的schedule()驷不比舌。

    // helper function that sends information about all existing
    // and newly registered workers to channel ch. schedule()
    // reads ch to learn about workers.
    func (mr *Master) forwardRegistrations(ch chan string) {
        i := 0
        for {
            mr.Lock()
            if len(mr.workers) > i {
                // there's a worker that we haven't told schedule() about.
                w := mr.workers[i]
                go func() { ch <- w }() // send without holding the lock.
                i = i   1
            } else {
                // wait for Register() to add an entry to workers[]
                // in response to an RPC from a new worker.
                mr.newCond.Wait()
            }
            mr.Unlock()
        }
    }
    

    schedule()

    shedule()即使如此相当短,但得以达成起来照旧有一点点难度的。
    waitGroup用来判断职责是还是不是实现。
    registerChan来监听是还是不是有新的 worker 运转,即便有的话,就开动叁个线程来运营该 worker。通过新开线程来运作新 worker的逻辑相比较切合分布式 MPRADO 的特点。
    对于 宕掉的worker执行call()操作时,会返回false
    每最西子行一个职分,就让waitGroup减意气风发,而实践停业(call()返回 false)则将waitGroup加风度翩翩,代表会将该任务布置给任何 worker。

    waitGroup.Wait()则会等到任务完全试行完重返。

    func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
        var ntasks int
        var n_other int // number of inputs (for reduce) or outputs (for map)
        switch phase {
        case mapPhase:
            ntasks = len(mapFiles)
            n_other = nReduce
        case reducePhase:
            ntasks = nReduce
            n_other = len(mapFiles)
        }
    
        fmt.Printf("Schedule: %v %v tasks (%d I/Os)n", ntasks, phase, n_other)
    
        // All ntasks tasks have to be scheduled on workers, and only once all of
        // them have been completed successfully should the function return.
        // Remember that workers may fail, and that any given worker may finish
        // multiple tasks.
    
        waitGroup := sync.WaitGroup{}
        waitGroup.Add(ntasks)
    
        taskChan := make(chan int, ntasks)
        for i:=0;i<ntasks;i    {
            taskChan <- i
        }
    
        go func() {
            for {
                ch := <- registerChan
                go func(c string) {
                    for {
                        i := <- taskChan
                        if call(c,"Worker.DoTask", &DoTaskArgs{jobName,
                            mapFiles[i],phase,i,n_other},new(struct{})){
                            waitGroup.Done()
                        } else{
                            taskChan <- i
                        }
                    }
                }(ch)
            }
        }()
    
        waitGroup.Wait()
    
        fmt.Printf("Schedule: %v phase donen", phase)
    }
    

    RunWorker()

    通过RunWorker() 来增加 worker。
    nRPC来支配 worker 的寿命,每选取一遍 rpc 央浼就 -1s。固然最初值为 -1,则意味着改 worker 是永生的。

    // RunWorker sets up a connection with the master, registers its address, and
    // waits for tasks to be scheduled.
    func RunWorker(MasterAddress string, me string,
        MapFunc func(string, string) []KeyValue,
        ReduceFunc func(string, []string) string,
        nRPC int,
    ) {
        debug("RunWorker %sn", me)
        wk := new(Worker)
        wk.name = me
        wk.Map = MapFunc
        wk.Reduce = ReduceFunc
        wk.nRPC = nRPC
        rpcs := rpc.NewServer()
        rpcs.Register(wk)
        os.Remove(me) // only needed for "unix"
        l, e := net.Listen("unix", me)
        if e != nil {
            log.Fatal("RunWorker: worker ", me, " error: ", e)
        }
        wk.l = l
        wk.register(MasterAddress)
    
        // DON'T MODIFY CODE BELOW
        for {
            wk.Lock()
            if wk.nRPC == 0 {
                wk.Unlock()
                break
            }
            wk.Unlock()
            conn, err := wk.l.Accept()
            if err == nil {
                wk.Lock()
                wk.nRPC--
                wk.Unlock()
                go rpcs.ServeConn(conn)
            } else {
                break
            }
        }
        wk.l.Close()
        debug("RunWorker %s exitn", me)
    }
    

    Part V: Inverted index generation

    第五局地是贯彻倒排索引。此处供给的倒排索引,正是在输出结果时,供给将现出过 key 值文件的文书名在 key 值后边输出。
    效果与利益是通过产生 mapF()reduceF() 来达成的。

    mapF()

    将key 值所在文件的文书名赋给 kv对 的value。

    func mapF(document string, value string) (res []mapreduce.KeyValue) {
        f := func(c rune) bool {
            return !unicode.IsLetter(c)
        }
        var strSlice []string = strings.FieldsFunc(value,f)
        var kvSlice []mapreduce.KeyValue
        for _,str := range strSlice {
            kvSlice = append(kvSlice, mapreduce.KeyValue{str, document})
        }
    
        return kvSlice
    }
    

    reduceF()

    将相近 key 值的享有 value 打包并计算数据重临。

    func reduceF(key string, values []string) string {
        var cnt int64
        var documents string
        set := make(map[string]bool)
        for _,str := range values{
            set[str] = true
        }
        var keys []string
        for key := range set{
            if set[key] == false{
                continue
            }
            keys = append(keys,key)
        }
        sort.Strings(keys)
        for _,key := range keys{
            cnt  
            if cnt >= 2{
                documents  = ","
            }
            documents  = key
        }
        //return strconv.FormatInt(cnt,10)
        return strconv.FormatInt(cnt,10)   " "   documents
    }
    

    后记

    从刚最早的不能够入手,到近期经过Lab1方方面面测量试验,M福睿斯实验算是完全做完了,照旧很有成就感的。
    除了这些之外对 MQashqai有三个越来越深的知道之外,也深深体会到了非凡系统的魔力——功用强盛,结构轻易。
    而且又打听了一门新语言——GoLang,一门特地为高并发系统而规划的言语,用起来依然很舒畅的。
    但那归根结蒂是遍布式系统的首先个实验,欠缺的知识还广大,继续大力。

    本文由新葡亰496net发布于电脑系统,转载请注明出处:分布式系统,分布式系统学习1

    关键词:

上一篇:完美运行

下一篇:没有了