最近空余时间有点太闲了,打算学下6.824这门课程。
实验结果 代码coordinator.go
type Coordinator struct { // Your definitions here. lock sync.Mutex nMap int nReduce int // max reduce task bucket mapCh chan string // unsigned map task reduceCh chan int // unsigned reduce task state int // 1 2 3 reduceTask map[int][]string // reduce task mapFinished map[string]bool reduceFinishMap map[int]bool reduceFinished int}// Your code here -- RPC handlers for the worker to call.//// an example RPC handler.//// the RPC argument and reply types are defined in rpc.go.//func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error { reply.Y = args.X + 1 return nil}//// start a thread that listens for RPCs from worker.go//func (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() //l, e := net.Listen("tcp", ":1234") sockname := coordinatorSock() os.Remove(sockname) l, e := net.Listen("unix", sockname) if e != nil { log.Fatal("listen error:", e) } go http.Serve(l, nil)}func (c *Coordinator) FetchMapTask(ignore int, reply *MapTask) error { fname, ok := <-c.mapCh if !ok { reply.Finish = true return nil } reply.Fname = fname reply.NReduce = c.nReduce go c.checkTimeout(fname) return nil}func (c *Coordinator) FetchReduceTask(ignore int, reply *ReduceTask) error { rid, ok := <-c.reduceCh // 这里如果持有锁可能会造成死锁,因为 ReduceDone也要申请锁 if !ok { reply.Finish=true return nil //return &NoTaskAvailableError{"all tasks had been assigned."} } ita := c.reduceTask[rid] // intimate result Files of ith bucket reply.Files = ita reply.ID = rid // rid go c.checkTimeoutReduceTask(rid) return nil}func (c *Coordinator) checkTimeout(s string) { time.Sleep(time.Second * 10) lock := &c.lock lock.Lock() defer lock.Unlock() m := c.mapFinished ch := c.mapCh if m[s] { return } log.Printf("map task#%s time out.", s) ch <- s}func (c *Coordinator) checkTimeoutReduceTask(rid int) { time.Sleep(time.Second * 10) lock := &c.lock lock.Lock() defer lock.Unlock() m := c.reduceFinishMap ch := c.reduceCh if m[rid] { return } log.Printf("reduce task#%d time out.", rid) ch <- rid}// MapDone reply must is a pointerfunc (c *Coordinator) MapDone(arg *MapTaskDone, reply *bool) error { lock := &c.lock lock.Lock() defer lock.Unlock() finished := c.mapFinished if finished[arg.Fname] { // duplicate commit return nil } finished[arg.Fname] = true for k, v := range arg.Items { tasks := c.reduceTask[k] tasks = append(tasks, v) c.reduceTask[k] = tasks } if len(finished) == c.nMap { c.state = 2 close(c.mapCh) // next stage go func() { c.lock.Lock() defer c.lock.Unlock() for k := range c.reduceTask { c.reduceCh <- k c.reduceFinishMap[k] = false } }() } return nil}// ReduceDone 需考虑如果timeout再被调用func (c *Coordinator) ReduceDone(arg int, ignore *int) error { lock := &c.lock lock.Lock() defer lock.Unlock() if !c.reduceFinishMap[arg] { c.reduceFinishMap[arg] = true c.reduceFinished++ } if len(c.reduceFinishMap) == c.reduceFinished { c.state = 3 close(c.reduceCh) } return nil}//// main/mrcoordinator.go calls Done() periodically to find out// if the entire job has finished.//func (c *Coordinator) Done() bool { lock := &c.lock lock.Lock() defer lock.Unlock() // Your code here. return c.state == 3}//// create a Coordinator.// main/mrcoordinator.go calls this function.// NReduce is the number of reduce tasks to use.//func MakeCoordinator(files []string, nReduce int) *Coordinator { // Your code here. c := Coordinator{ nMap: len(files), nReduce: nReduce, mapCh: make(chan string, 10), reduceCh: make(chan int, nReduce), state: 1, reduceTask: map[int][]string{}, mapFinished: map[string]bool{}, reduceFinishMap: map[int]bool{}, reduceFinished: 0, } for _, s := range files { c.mapCh <- s } c.server() return &c}
worker.go
//// Map functions return a slice of KeyValue.//type KeyValue struct { Key string Value string}//// use ihash(key) % NReduce to choose the reduce// task number for each KeyValue emitted by Map.//func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff)}type worker struct { stage int mapf func(string, string) []KeyValue reducef func(string, []string) string}//// main/mrworker.go calls this function.//func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. w := worker{ stage: 1, mapf: mapf, reducef: reducef, } for w.stage != 3 { w.fetch() } // uncomment to send the Example RPC to the coordinator. // CallExample()}//// example function to show how to make an RPC call to the coordinator.//// the RPC argument and reply types are defined in rpc.go.//func CallExample() { // declare an argument structure. args := ExampleArgs{} // fill in the argument(s). args.X = 99 // declare a reply structure. reply := ExampleReply{} // send the RPC request, wait for the reply. // the "Coordinator.Example" tells the // receiving server that we'd like to call // the Example() method of struct Coordinator. err := call("Coordinator.Example", &args, &reply) if err == nil { // reply.Y should be 100. fmt.Printf("reply.Y %vn", reply.Y) } else { fmt.Printf("call failed!n") }}//// send an RPC request to the coordinator, wait for the response.// usually returns true.// returns false if something goes wrong.//func call(rpcname string, args interface{}, reply interface{}) error { // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") sockname := coordinatorSock() c, err := rpc.DialHTTP("unix", sockname) if err != nil { log.Fatal("dialing:", err) } defer c.Close() err = c.Call(rpcname, args, reply) return err}func (w *worker) fetch() { var mapTask MapTask var reduceTask ReduceTask if w.stage == 1 { err := call("Coordinator.FetchMapTask", 1, &mapTask) if err != nil { log.Fatalln(err) } else if mapTask.Finish { w.stage = 2 return } res := w.doMap(&mapTask) ok := false err = call("Coordinator.MapDone", res, &ok) if err != nil { log.Fatalf("call rpc MapDone failed、" + err.Error()) } } else if w.stage == 2 { err := call("Coordinator.FetchReduceTask", 0, &reduceTask) if err != nil { log.Fatalln(err) } else if reduceTask.Finish { w.stage = 3 return } res := reduceTask.ID w.doReduce(reduceTask.Files, reduceTask.ID) ignore := 0 call("Coordinator.ReduceDone", res, &ignore) }}func (w *worker) doMap(t *MapTask) MapTaskDone { nReduce := t.NReduce source := t.Fname file, err := os.Open(source) if err != nil { log.Fatalf("cannot open %kva_i", source) } bytes, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %kva_i", source) } content := string(bytes) mapf := w.mapf kva := mapf(source, content) m := map[int][]KeyValue{} // reduce task bucket for i := 0; i < nReduce; i++ { m[i] = make([]KeyValue, 0, 0) } for _, kv := range kva { rid := ihash(kv.Key) % nReduce // dispatch this kv to one reduce task r := m[rid] r = append(r, kv) m[rid] = r } res := MapTaskDone{ Fname: source, Items: map[int]string{}, } for rid, kva_i := range m { if len(kva_i) == 0 { continue } // source 可能含有前缀 ../ , 导致file创建失败 split := strings.Split(source, "/") fname := strconv.Itoa(rid) + "-" + split[len(split)-1] file, _ := os.Create(fname) for _, kv := range kva_i { _, _ = fmt.Fprintf(file, "%v %vn", kv.Key, kv.Value) } file.Close() res.Items[rid] = fname } return res}// ByKey for sorting by key.type ByKey []KeyValue// Len for sorting by key.func (a ByKey) Len() int { return len(a) }func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }func (w *worker) doReduce(files []string, id int) int { kvaStr := "" for _, s := range files { file, err := os.Open(s) if err != nil { log.Fatalln(err) } bytes, _ := ioutil.ReadAll(file) kvaStr += string(bytes) } var it []KeyValue for _, kvStr := range strings.Split(kvaStr, "n") { split := strings.Split(kvStr, " ") if len(split) < 2 { continue } k := split[0] v := split[1] it = append(it, KeyValue{k, v}) } sort.Sort(ByKey(it)) var values []string reducef := w.reducef ofile, _ := os.Create("mr-out-" + strconv.Itoa(id)) for i := 0; i < len(it); i++ { k := it[i].Key values = append(values, it[i].Value) for i+1 < len(it) && it[i+1].Key == k { values = append(values, it[i+1].Value) i++ } output := reducef(k, values) values = []string{} _, _ = fmt.Fprintf(ofile, "%v %vn", k, output) } ofile.Close() //log.Printf("reduce task#%d done、save to %s", id, ofileName) return id}
总结lab1并没有特别顺利,中途遇到死锁、文件创建失败等问题,排查以来也有点费劲。