The Workers
The worker code is relatively simple, and mostly resembles the sequential implementation of MapReduce provided in the starter code.
The Worker starts and asks for its first task. Then it works on the task, report to the Coordinator once it's done, and requests a new task. The coordinator might also ask the Worker to wait or shut down.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
task := CallRequestTask(Task{}).Task
for task.TaskType != ShutdownTask {
switch task.TaskType {
case WaitTask:
time.Sleep(2 * time.Second)
case MapTask:
workerMap(mapf, task)
case ReduceTask:
workerReduce(reducef, task)
}
task = CallRequestTask(task).Task
}
log.Printf("Worker shutting down\n")
}
When the Worker gets a Map
Task, it enters the workerMap
function.
func workerMap(mapf func(string, string) []KeyValue, task Task) {
inputFile, err := os.Open(task.MapInputFileName)
if err != nil {
log.Fatalf("cannot open %v", task.MapInputFileName)
}
inputContent, err := ioutil.ReadAll(inputFile)
if err != nil {
log.Fatalf("cannot read %v", task.MapInputFileName)
}
defer inputFile.Close()
intermediate := mapf(task.MapInputFileName, string(inputContent))
sort.Sort(ByKey(intermediate))
buckets := make([][]KeyValue, task.NReduce)
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
bucketIndex := ihash(intermediate[i].Key) % task.NReduce
buckets[bucketIndex] = append(buckets[bucketIndex], intermediate[i:j]...)
i = j
}
for i, bucket := range buckets {
tmpFile, err := ioutil.TempFile("", "mr-tmp-*")
if err != nil {
log.Fatalf("cannot create tmp file")
}
enc := json.NewEncoder(tmpFile)
for _, kv := range bucket {
enc.Encode(&kv)
}
tmpFile.Close()
intermediateFileName := fmt.Sprintf("mr-%d-%d", task.TaskNumber, i)
os.Rename(tmpFile.Name(), intermediateFileName)
}
}
One thing to note here is that I used a [][]KeyValue
to first group the intermediate results by hash, and then create a intermediate file mr-X-Y
for each hash. In my first attempt, I opened the file mr-X-Y
for each key and wrote to it, and ended up overwriting what I wrote for other keys that mapped to the same Y
.
When the Worker gets a Reduce
task, it enters the workerReduce
function:
func workerReduce(reducef func(string, []string) string, task Task) {
intermediate := []KeyValue{}
for i := 0; i < task.NMap; i++ {
intermediateFileName := fmt.Sprintf("mr-%d-%d", i, task.TaskNumber)
intermediateFile, err := os.Open(intermediateFileName)
if err != nil {
log.Fatalf("cannot open %v", intermediateFileName)
}
defer intermediateFile.Close()
dec := json.NewDecoder(intermediateFile)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
intermediateFile.Close()
}
sort.Sort(ByKey(intermediate))
tmpFile, err := ioutil.TempFile("", "mr-out-*")
if err != nil {
log.Fatalf("cannot create tmp file")
}
defer tmpFile.Close()
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
outputFileName := fmt.Sprintf("mr-out-%d", task.TaskNumber)
os.Rename(tmpFile.Name(), outputFileName)
}
The Worker working on Reduce
task Y
reads from all mr-*-Y
files, and groups them by the key. Then it runs reducef
for each key K
, and outputs the final result to mr-out-K
.
In both workerMap
and workerReduce
, following the lab spec's suggestion, I also used a temporary file to write output, and then moved it to the actual location with os.Rename
. This is to prevent unexpected crashes from leaving a half-finished output file in the output directory.
What I Learned
That's pretty much it! I ran test-mr.sh
and all tests passed. It is definitely a challenging lab for a Lab 1, and definitely takes effort to design the structure of the Coordinator and how the Coordinator and the Workers communicate.
This lab involves very basic uses of the sync.Mutex
lock, but doesn't really involve more complicated multithreading primitives like conditional variables and channels. I expect them to be a challenge for future labs.