Along with Operating Systems, I have also been following MIT's Distributed Systems class. Again, I had a pretty bad professor when I took distributed systems in college; listening to lectures given by an actual expert is so much better.
The first lab for the course is implementing MapReduce in Go. The lab provides a very simple RPC framework (since Go itself already has pretty good support for RPC), and a Go plugin interface for writing MapReduce apps.
A MapReduce App
A MapReduce application runs in 2 phases: Map
and Reduce
. To define a MapReduce application, we just need 2 functions:
- A
mapf
function that accepts a file (name and content), and outputs intermediate key-value pairs. - A
reducef
function that takes all key-value pairs with a certain key, and outputs an aggregate value for that key
In Go code, the function signatures might look like:
func Mapf(filename string, contents string) []mr.KeyValue
func Reducef(key string, values []string) string
We have a Coordinator that accepts a list of input files and an integer nReduce
. nReduce
is how many reduce tasks to output. (You might wonder: what about nMap
? It's just the count of the input files!)
The Coordinator's job is to assign tasks to Workers. MapReduce works in 2 phases: Map
and Reduce
. In the Map
phase, the Coordinator assigns Map
tasks; when all Map
tasks are complete, the Coordinator starts assigning Reduce
tasks; when all Reduce
tasks are complete, the app is finished and can shut down.
It might be worth mentioning that while each Map
task only runs one mapf
function, a Reduce
task might run reducef
multiple times. This is because mapf
can output arbitrarily many keys of any type. The MapReduce framework sorts the intermediate key-value pairs into buckets by computing a hash of the key that is an integer between 0 and nReduce
, so each intermediate key-value pair is mapped to a certain Reduce
task. A Reduce
task would then process the entire bucket, and would need to run reducef
for every distinct key in that bucket.
RPC
I modeled all communication between the Worker and the Coordinator as a single RPC, with a resource Task
.
type TaskType string
const (
TaskTypeUnspecified = "Unspecified"
MapTask = "Map"
ReduceTask = "Reduce"
WaitTask = "Wait"
ShutdownTask = "Shutdown"
)
type Task struct {
TaskType TaskType
TaskNumber int
NMap int
NReduce int
// Map-specific data
MapInputFileName string
}
type RequestTaskArgs struct {
CompletedTask Task
}
type RequestTaskReply struct {
Task Task
}
I combined the "reporting task done" action with the "request next task" action to simplify the RPCs Workers have to make. If the Worker would like to report a task as completed, they can just set the CompletedTask
arg.
I also introduced two other task types: Wait
and Shutdown
. When the server says the app has finished, it can just tell the Worker to shut down since there is no more task left; when the server has assigned all tasks but is still waiting to hear back, it can ask the Worker to wait and come back later.
The Coordinator
The Coordinator's job is to assign and track tasks, sort of like a manager. Obviously, it needs to track some states: mainly the current phase of the MapReduce app (Map
vs Reduce
), and the status of each task.
type MapReducePhase string
const (
PhaseUnspecified = "Unspecified"
Map = "Map"
Reduce = "Reduce"
Done = "Done"
)
type TaskState string
const (
StateUnspecified = "Unspecified"
Unstarted = "Unstarted"
Assigned = "Assigned"
Completed = "Completed"
)
type Coordinator struct {
// Your definitions here.
mu sync.Mutex
// Properties
nMap int
nReduce int
inputFiles []string
// Execution state
taskStates []TaskState
phase MapReducePhase
}
Surprisingly, that's it. The Coordinator also needs to reassign tasks when it thinks a Worker has crashed or stalled, so I initially thought we would need to track which Worker each task is assigned to. It turns out it's unnecessary --- the Coordinator can simply unassigned the task and wait for the next Worker to pick it up.
The TaskStates
field tracks the state of each task. It's shared across the Map
and Reduce
phases, since the app cleanly moves to the Reduce
state when Map
completes, and never needs to go back. When the Map
phase finishes, TaskStates
is re-initialized to store states about Reduce
Tasks.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
c.nMap = len(files)
c.nReduce = nReduce
c.inputFiles = files
c.initMapPhase()
c.server()
return &c
}
func (c *Coordinator) initMapPhase() {
c.phase = Map
c.taskStates = make([]TaskState, c.nMap)
for i := range c.taskStates {
c.taskStates[i] = Unstarted
}
}
func (c *Coordinator) initReducePhase() {
c.phase = Reduce
c.taskStates = make([]TaskState, c.nReduce)
for i := range c.taskStates {
c.taskStates[i] = Unstarted
}
}
When a Worker asks for a task, we can just loop over the TaskStates
array and find any Unstarted
task i
to assign to the worker. Then we can set TaskStates[i]
to Assigned
. When a Worker comes back and says they have finished task i
, we can just set TaskStates[i]
to Completed
.
func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if args.CompletedTask.TaskType == c.getTaskType() {
c.taskStates[args.CompletedTask.TaskNumber] = Completed
}
reply.Task = c.getUnstartedTask()
return nil
}
func (c *Coordinator) getUnstartedTask() Task {
task := Task{}
for i, state := range c.taskStates {
if state == Unstarted {
task.TaskNumber = i
task.TaskType = c.getTaskType()
task.NMap = c.nMap
task.NReduce = c.nReduce
if task.TaskType == MapTask {
task.MapInputFileName = c.inputFiles[i]
}
c.taskStates[i] = Assigned
return task
}
}
// ...
}
Whenever there is no Unstarted
tasks left, two things could have happened. Either all tasks have been completed, or other workers that have taken all the tasks.
If all tasks have been completed, then the app can move to the next phase. If we were on Map
, we move to Reduce
, and the Worker can be assigned any Reduce
task immediately; if we were on Reduce
, then we are done now, and we can ask the Worker to Shutdown
.
func (c *Coordinator) getUnstartedTask() Task {
task := Task{}
for i, state := range c.taskStates {
if state == Unstarted {
task.TaskNumber = i
task.TaskType = c.getTaskType()
task.NMap = c.nMap
task.NReduce = c.nReduce
if task.TaskType == MapTask {
task.MapInputFileName = c.inputFiles[i]
}
c.taskStates[i] = Assigned
return task
}
}
// no unstarted tasks
if c.isPhaseComplete() {
if c.phase == Map {
c.initReducePhase()
// try getting a reduce task
return c.getUnstartedTask()
}
if c.phase == Reduce {
c.phase = Done
task.TaskType = ShutdownTask
return task
}
}
// no unstarted tasks, but not done yet
// wait and come back later
task.TaskType = WaitTask
return task
}
That's everything on the server! Well, almost. The server would also need to set a timeout for each task. The lab spec requires that the server waits for 10 seconds before re-assigning. Thanks to Go, it's extremely easy to set a timeout. We can simply set a go routine when we assign a task. The go routine sleeps for 10 seconds, and come back to set TaskStates[i]
as Unstarted
if it's not completed yet.
// ...
for i, state := range c.taskStates {
if state == Unstarted {
task.TaskNumber = i
task.TaskType = c.getTaskType()
task.NMap = c.nMap
task.NReduce = c.nReduce
if task.TaskType == MapTask {
task.MapInputFileName = c.inputFiles[i]
}
c.taskStates[i] = Assigned
c.setTaskTimeout(task)
return task
}
}
// ...
func (c *Coordinator) setTaskTimeout(task Task) {
go func() {
time.Sleep(10 * time.Second)
c.mu.Lock()
defer c.mu.Unlock()
if c.taskStates[task.TaskNumber] == Assigned {
c.taskStates[task.TaskNumber] = Unstarted
}
}()
}
This post is already getting kind of long. We will discuss the Worker in the next part.