Oct 6, 2023·7 min read

Distributed Systems: MapReduce, Part 1

Distributed Systems: MapReduce, Part 1

Along with Operating Systems, I have also been following MIT's Distributed Systems class. Again, I had a pretty bad professor when I took distributed systems in college; listening to lectures given by an actual expert is so much better.

The first lab for the course is implementing MapReduce in Go. The lab provides a very simple RPC framework (since Go itself already has pretty good support for RPC), and a Go plugin interface for writing MapReduce apps.

A MapReduce App

A MapReduce application runs in 2 phases: Map and Reduce. To define a MapReduce application, we just need 2 functions:

  • A mapf function that accepts a file (name and content), and outputs intermediate key-value pairs.
  • A reducef function that takes all key-value pairs with a certain key, and outputs an aggregate value for that key

In Go code, the function signatures might look like:

func Mapf(filename string, contents string) []mr.KeyValue
func Reducef(key string, values []string) string

We have a Coordinator that accepts a list of input files and an integer nReduce. nReduce is how many reduce tasks to output. (You might wonder: what about nMap? It's just the count of the input files!)

The Coordinator's job is to assign tasks to Workers. MapReduce works in 2 phases: Map and Reduce. In the Map phase, the Coordinator assigns Map tasks; when all Map tasks are complete, the Coordinator starts assigning Reduce tasks; when all Reduce tasks are complete, the app is finished and can shut down.

It might be worth mentioning that while each Map task only runs one mapf function, a Reduce task might run reducef multiple times. This is because mapf can output arbitrarily many keys of any type. The MapReduce framework sorts the intermediate key-value pairs into buckets by computing a hash of the key that is an integer between 0 and nReduce, so each intermediate key-value pair is mapped to a certain Reduce task. A Reduce task would then process the entire bucket, and would need to run reducef for every distinct key in that bucket.

RPC

I modeled all communication between the Worker and the Coordinator as a single RPC, with a resource Task.

type TaskType string

const (
	TaskTypeUnspecified = "Unspecified"
	MapTask             = "Map"
	ReduceTask          = "Reduce"
	WaitTask            = "Wait"
	ShutdownTask        = "Shutdown"
)

type Task struct {
	TaskType   TaskType
	TaskNumber int

	NMap    int
	NReduce int

	// Map-specific data
	MapInputFileName string
}

type RequestTaskArgs struct {
	CompletedTask Task
}

type RequestTaskReply struct {
	Task Task
}

I combined the "reporting task done" action with the "request next task" action to simplify the RPCs Workers have to make. If the Worker would like to report a task as completed, they can just set the CompletedTask arg.

I also introduced two other task types: Wait and Shutdown. When the server says the app has finished, it can just tell the Worker to shut down since there is no more task left; when the server has assigned all tasks but is still waiting to hear back, it can ask the Worker to wait and come back later.

The Coordinator

The Coordinator's job is to assign and track tasks, sort of like a manager. Obviously, it needs to track some states: mainly the current phase of the MapReduce app (Map vs Reduce), and the status of each task.

type MapReducePhase string
const (
	PhaseUnspecified = "Unspecified"
	Map              = "Map"
	Reduce           = "Reduce"
	Done             = "Done"
)

type TaskState string
const (
	StateUnspecified = "Unspecified"
	Unstarted        = "Unstarted"
	Assigned         = "Assigned"
	Completed        = "Completed"
)

type Coordinator struct {
	// Your definitions here.
	mu sync.Mutex

	// Properties
	nMap       int
	nReduce    int
	inputFiles []string

	// Execution state
	taskStates []TaskState
	phase      MapReducePhase
}

Surprisingly, that's it. The Coordinator also needs to reassign tasks when it thinks a Worker has crashed or stalled, so I initially thought we would need to track which Worker each task is assigned to. It turns out it's unnecessary --- the Coordinator can simply unassigned the task and wait for the next Worker to pick it up.

The TaskStates field tracks the state of each task. It's shared across the Map and Reduce phases, since the app cleanly moves to the Reduce state when Map completes, and never needs to go back. When the Map phase finishes, TaskStates is re-initialized to store states about Reduce Tasks.

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	c.nMap = len(files)
	c.nReduce = nReduce
	c.inputFiles = files

	c.initMapPhase()

	c.server()
	return &c
}

func (c *Coordinator) initMapPhase() {
	c.phase = Map
	c.taskStates = make([]TaskState, c.nMap)
	for i := range c.taskStates {
		c.taskStates[i] = Unstarted
	}
}

func (c *Coordinator) initReducePhase() {
	c.phase = Reduce
	c.taskStates = make([]TaskState, c.nReduce)
	for i := range c.taskStates {
		c.taskStates[i] = Unstarted
	}
}

When a Worker asks for a task, we can just loop over the TaskStates array and find any Unstarted task i to assign to the worker. Then we can set TaskStates[i] to Assigned. When a Worker comes back and says they have finished task i, we can just set TaskStates[i] to Completed.

func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if args.CompletedTask.TaskType == c.getTaskType() {
		c.taskStates[args.CompletedTask.TaskNumber] = Completed
	}
	reply.Task = c.getUnstartedTask()
	return nil
}

func (c *Coordinator) getUnstartedTask() Task {
	task := Task{}
	for i, state := range c.taskStates {
		if state == Unstarted {
			task.TaskNumber = i
			task.TaskType = c.getTaskType()
			task.NMap = c.nMap
			task.NReduce = c.nReduce
			if task.TaskType == MapTask {
				task.MapInputFileName = c.inputFiles[i]
			}
			c.taskStates[i] = Assigned
			return task
		}
	}

	// ...
}

Whenever there is no Unstarted tasks left, two things could have happened. Either all tasks have been completed, or other workers that have taken all the tasks.

If all tasks have been completed, then the app can move to the next phase. If we were on Map, we move to Reduce, and the Worker can be assigned any Reduce task immediately; if we were on Reduce, then we are done now, and we can ask the Worker to Shutdown.

func (c *Coordinator) getUnstartedTask() Task {
	task := Task{}
	for i, state := range c.taskStates {
		if state == Unstarted {
			task.TaskNumber = i
			task.TaskType = c.getTaskType()
			task.NMap = c.nMap
			task.NReduce = c.nReduce
			if task.TaskType == MapTask {
				task.MapInputFileName = c.inputFiles[i]
			}
			c.taskStates[i] = Assigned
			return task
		}
	}

	// no unstarted tasks
	if c.isPhaseComplete() {
		if c.phase == Map {
			c.initReducePhase()
			// try getting a reduce task
			return c.getUnstartedTask()
		}

		if c.phase == Reduce {
			c.phase = Done
			task.TaskType = ShutdownTask
			return task
		}
	}

	// no unstarted tasks, but not done yet
	// wait and come back later
	task.TaskType = WaitTask
	return task
}

That's everything on the server! Well, almost. The server would also need to set a timeout for each task. The lab spec requires that the server waits for 10 seconds before re-assigning. Thanks to Go, it's extremely easy to set a timeout. We can simply set a go routine when we assign a task. The go routine sleeps for 10 seconds, and come back to set TaskStates[i] as Unstarted if it's not completed yet.

	// ...
	for i, state := range c.taskStates {
		if state == Unstarted {
			task.TaskNumber = i
			task.TaskType = c.getTaskType()
			task.NMap = c.nMap
			task.NReduce = c.nReduce
			if task.TaskType == MapTask {
				task.MapInputFileName = c.inputFiles[i]
			}
			c.taskStates[i] = Assigned
			c.setTaskTimeout(task)
			return task
		}
	}
	// ...

func (c *Coordinator) setTaskTimeout(task Task) {
	go func() {
		time.Sleep(10 * time.Second)
		c.mu.Lock()
		defer c.mu.Unlock()
		if c.taskStates[task.TaskNumber] == Assigned {
			c.taskStates[task.TaskNumber] = Unstarted
		}
	}()
}

This post is already getting kind of long. We will discuss the Worker in the next part.