Just completed MIT 6.5840 MapReduce Lab
There’s been a huge update on this blog, but you can find the old version of it at 9dce7f5.
Additionally, I won’t copy-paste the MapReduce’s concepts here so MapReduce (2004). In fact, I just learned MapReduce for fun because it’s been already marked as deprecated in 2014 by its father, Google.
Implementation
Actually, I found this lab not that challenging thanks to a bunch of useful hints provided at the bottom of the lab requirements. I just stuck to all of the hints, except one about os.Rename
, which’ll be detailed in the next section.
Overall, we have one Coordinator that distributes tasks for multiple Workers. Plus, all Map tasks must be completed before any Reduce task can start.
Workers
Besides the Mapping/Reducing functions assigned on bootstrap, a Worker must generate its own ID to distinguish itself from others. And my Workers do this in the silliest way:
workerID := rand.Int()
After starting, every Worker asks the Coordinator for a task periodically. If there’s no task, the Worker will time.Sleep(3 * time.Second)
before retrying. Else, the Worker will execute their assigned task then report the result back to the Coordinator.
Map tasks handling
Each Map task asks Workers to process one single input file and write the result to the correct output files, which significantly matters to the final result.
- Obtain the list of
KeyValue
by runningmapf
on the input file’s content. - Store each pair into the appropriate temporary
mr-${TaskID}-${i}(.txt)
file, wherei
is calculated byihash(kv.Key) % nReduce
. - Persist those temporary files to disk.
- Submit the result.
Reduce tasks handling
Tasks for Workers during the Reduce phase involve processing multiple intermediate files outputted from the Map phase.
It’s notable that we have to aggregate all values of the same key for input of the reducef
function. One effective way (not allocating extra space) is sorting the intermediate files by key.
slices.SortFunc(kvs, func(a, b KeyValue) int {
return strings.Compare(a.Key, b.Key)
})
for i := 0; i < len(kvs); {
var vals []string
for j := i; j < len(kvs) && kvs[j].Key == kvs[i].Key; j++ {
vals = append(vals, kvs[j].Value)
}
}
The results of all keys are finally concatenated and written to the mr-out-${TaskID}(.txt)
output file.
Coordinator
The tasks of the Coordinator are no more than assigning Map/Reduce tasks to Workers and monitoring the status of these tasks, which is straightforward to implement.
However, as written, the Coordinator can’t reliably notice crashes from Workers. The solution is to launch a goroutine that periodically checks then marks timed-out tasks’ status back as Idle
.
for _, s := range c.maps {
if s != nil && s.Status == InProgress && t.Sub(s.AssignedAt).Seconds() > 5 {
s.Status = Idle
fmt.Printf("task map %d timeout\n", s.Task.ID)
}
}
Go’s os.Rename
and Linux’s VFS
You will probably see this hint when working on this lab:
To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use
ioutil.TempFile
(oros.CreateTemp
if you are running Go 1.17 or later) to create a temporary file andos.Rename
to atomically rename it.
However, if going with os.Rename
on Linux (I have no idea about macOS and stupid Windows), you will end up with this error:
rename /tmp/2829667613 mr-2-0: invalid cross-device link
Meanwhile, the mv
command works perfectly fine. So what’s the difference between them? This is Linux’s Virtual File System (VFS).
Linux’s Virtual File System
By definition, VFS is a kernel software layer handling all syscalls related to the standard filesystem. Therefore, not only supporting multiple (almost) filesystems, VFS also enables application programs to access and work with different kinds of filesystems in a uniform way. On top of that, VFS allows non-kernel developers to plug their own filesystems into it by performing filesystem type registration.
There’re 3 main filesystem classes supported by VFS:
- Disk-based: ext4, btrfs, NTFS, etc.
- Network-based
- Special: procfs, sysfs, and most important tmpfs, which’s a RAM-based filesystem used to store your temporary files. Lemme show you:
$ df -T /tmp /home
Filesystem Type 1K-blocks Used Available Use% Mounted on
tmpfs tmpfs 7854140 93908 7760232 2% /tmp
/dev/nvme0n1p2 ext4 481013404 167426420 289079336 37% /
So, the reason why mv
and even other built-in commands work fine is because they all interact with VFS, which’s already handled the cross-filesystem stuff for you.
On the other hand, os.Rename
directly invokes the rename
syscall, which is an atomic operation in favor of preventing race conditions within a single filesystem, so os.Rename
has no capability to handle files across filesystems.
Solution
It’ll disappoint you, but the way to solve this is fairly straightforward: just copy the file (then remove the original one).
func moveFile(src, dest string) error {
s, err := os.Open(src)
if err != nil {
return fmt.Errorf("open source file %s: %v", src, err)
}
defer s.Close()
d, err := os.Create(dest)
if err != nil {
return fmt.Errorf("create dest file %s: %v", dest, err)
}
defer d.Close()
if _, err = io.Copy(d, s); err != nil {
return fmt.Errorf("copy content: %v", err)
}
if err = os.Remove(src); err != nil {
return fmt.Errorf("remove source file %s: %v", src, err)
}
return nil
}