Oct 7, 2023·4 min read

Distributed Systems: MapReduce, Part 2

Distributed Systems: MapReduce, Part 2

The Workers

The worker code is relatively simple, and mostly resembles the sequential implementation of MapReduce provided in the starter code.

The Worker starts and asks for its first task. Then it works on the task, report to the Coordinator once it's done, and requests a new task. The coordinator might also ask the Worker to wait or shut down.

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	task := CallRequestTask(Task{}).Task
	for task.TaskType != ShutdownTask {
		switch task.TaskType {
		case WaitTask:
			time.Sleep(2 * time.Second)
		case MapTask:
			workerMap(mapf, task)
		case ReduceTask:
			workerReduce(reducef, task)
		}
		task = CallRequestTask(task).Task
	}

	log.Printf("Worker shutting down\n")
}

When the Worker gets a Map Task, it enters the workerMap function.

func workerMap(mapf func(string, string) []KeyValue, task Task) {
	inputFile, err := os.Open(task.MapInputFileName)
	if err != nil {
		log.Fatalf("cannot open %v", task.MapInputFileName)
	}
	inputContent, err := ioutil.ReadAll(inputFile)
	if err != nil {
		log.Fatalf("cannot read %v", task.MapInputFileName)
	}
	defer inputFile.Close()

	intermediate := mapf(task.MapInputFileName, string(inputContent))

	sort.Sort(ByKey(intermediate))

	buckets := make([][]KeyValue, task.NReduce)

	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		bucketIndex := ihash(intermediate[i].Key) % task.NReduce
		buckets[bucketIndex] = append(buckets[bucketIndex], intermediate[i:j]...)
		i = j
	}

	for i, bucket := range buckets {
		tmpFile, err := ioutil.TempFile("", "mr-tmp-*")
		if err != nil {
			log.Fatalf("cannot create tmp file")
		}
		enc := json.NewEncoder(tmpFile)
		for _, kv := range bucket {
			enc.Encode(&kv)
		}
		tmpFile.Close()
		intermediateFileName := fmt.Sprintf("mr-%d-%d", task.TaskNumber, i)
		os.Rename(tmpFile.Name(), intermediateFileName)
	}
}

One thing to note here is that I used a [][]KeyValue to first group the intermediate results by hash, and then create a intermediate file mr-X-Y for each hash. In my first attempt, I opened the file mr-X-Y for each key and wrote to it, and ended up overwriting what I wrote for other keys that mapped to the same Y.

When the Worker gets a Reduce task, it enters the workerReduce function:

func workerReduce(reducef func(string, []string) string, task Task) {
	intermediate := []KeyValue{}
	for i := 0; i < task.NMap; i++ {
		intermediateFileName := fmt.Sprintf("mr-%d-%d", i, task.TaskNumber)
		intermediateFile, err := os.Open(intermediateFileName)
		if err != nil {
			log.Fatalf("cannot open %v", intermediateFileName)
		}
		defer intermediateFile.Close()
		dec := json.NewDecoder(intermediateFile)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}
		intermediateFile.Close()
	}

	sort.Sort(ByKey(intermediate))

	tmpFile, err := ioutil.TempFile("", "mr-out-*")
	if err != nil {
		log.Fatalf("cannot create tmp file")
	}
	defer tmpFile.Close()

	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}

	outputFileName := fmt.Sprintf("mr-out-%d", task.TaskNumber)
	os.Rename(tmpFile.Name(), outputFileName)
}

The Worker working on Reduce task Y reads from all mr-*-Y files, and groups them by the key. Then it runs reducef for each key K, and outputs the final result to mr-out-K.

In both workerMap and workerReduce, following the lab spec's suggestion, I also used a temporary file to write output, and then moved it to the actual location with os.Rename. This is to prevent unexpected crashes from leaving a half-finished output file in the output directory.

What I Learned

That's pretty much it! I ran test-mr.sh and all tests passed. It is definitely a challenging lab for a Lab 1, and definitely takes effort to design the structure of the Coordinator and how the Coordinator and the Workers communicate.

This lab involves very basic uses of the sync.Mutex lock, but doesn't really involve more complicated multithreading primitives like conditional variables and channels. I expect them to be a challenge for future labs.