Getting Started with MapReduce the TDD Way

Writing and running the first lines of MapReduce code for Hadoop is an involved process that often turns beginners away. I will use the test driven development approach to give an introduction to writing MapReduce jobs and unit and integration tests without even installing a Hadoop server.

Requirements

I will use Scala and sbt for the code examples below because I like conciseness of the language but the approach should be easy to adopt to Java or any other JVM language.

To run the code examples the only thing you will have to install is sbt. I used version 1.0.2. Use your favorite terminal application to launch sbt commands.

You will also need a text editor or an IDE with built-in sbt support (I recommend IntelliJ IDEA with Scala plugin). The examples will assume you are using sbt from the command line, feel free to use your IDE’s equivalent of the sbt commands.

Although they do not have to be installed separately, this tutorial uses Scala 2.12.3 and Hadoop 2.8.1.

Introduction

Big data is a hot topic nowadays and Apache Hadoop is one of the popular software in use for storing and processing large data sets. There are many tutorials on how to get started with MapReduce - Hadoop’s underlying programming model - but not many of them serve the needs of the impatient developer. Moreover there is almost no time spent on testing a MapReduce job and Google search results often suggest using MRUnit, a now defunct and unsupported tool.

The official MapReduce Tutorial and many other learning material use the “word count problem” as an example - this tutorial will follow the tradition. The goal is to count the occurrence of the words in a bunch of text files.

While I was an engineer SoundCloud I had to learn Hadoop the hard way. Working with seasoned big data engineers meant I had access to a well maintained cluster and there was always someone to help out solving complicated problems but material or good advice on making your first steps did not exist. This experience inspired me to write this post.

The basics of MapReduce

To give you a basic understanding of what Hadoop and MapReduce are, these are the basics:

Keep in mind there is a lot more beyond this explanation. This post will not discuss how Hadoop stores the data in a distributed manner neither how the mapper and reducer functions are executed. If you want to know more I recommend starting with the official Hadoop documentation and the MapReduce Tutorial.

Setting up a project

To start a new sbt project create an empty folder with a file named build.sbt in it having the following contents:

name := "hadoop-mapreduce-tutorial"

version := "1.0"

scalaVersion := "2.12.3"

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-client" % "2.8.1",
  "org.scalatest" %% "scalatest" % "3.0.1" % Test,
  "org.mockito" % "mockito-core" % "2.8.47" % Test
)

This will add the Hadoop Java library and the test frameworks as sbt-managed dependencies.

As you might have noticed I use ScalaTest test framework and the Mockito library for managing test doubles. Any other framework should do it, feel free to adopt the code to your favorite one.

Running sbt test from the terminal should be successful and print something like “No tests were executed”.

Writing a mapper

To repeat the basics, mapper is a function that takes a key - value pair and returns zero or more key - value pairs.

In our case the input values are all the lines in the text files, the mapper will split these into words (tokenize). The output key will be the word itself and the value always 1 meaning “this word occurred once”. The input keys are ignored in this mapper.

Let’s create an incomplete implementation in src/main/scala/TokenizerMapper.scala:

import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._

class TokenizerMapper
  extends Mapper[LongWritable, Text, Text, IntWritable] {

  override def map(key: LongWritable,
                   value: Text,
                   context: Mapper[LongWritable, Text, Text, IntWritable]#Context) {
    // TODO: implement the mapper
  }
}

Add a unit test for the mapper

A simple unit test would look like this in src/test/TokenizerMapperSpec.scala:

import org.apache.hadoop.io._
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mockito.MockitoSugar

class TokenizerMapperSpec extends FlatSpec with MockitoSugar {

  "map" should "output the words split on spaces" in {
    val mapper = new TokenizerMapper
    val context = mock[mapper.Context]

    mapper.map(
      key = null,
      value = new Text("foo bar foo"),
      context
    )

    verify(context, times(2)).write(new Text("foo"), new IntWritable(1))
    verify(context).write(new Text("bar"), new IntWritable(1))
  }

}

This is what happens here:

Run sbt test to see the test failing.

Implement the mapper

And now we are ready to implement a real mapper, let’s get back to TokenizerMapper. Change the contents to the following:

import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._

class TokenizerMapper
  extends Mapper[LongWritable, Text, Text, IntWritable] {

  override def map(key: LongWritable,
                   value: Text,
                   context: Mapper[LongWritable, Text, Text, IntWritable]#Context) {

    val words = value.toString.split("\\s+")

    words.foreach { word =>
      val key = new Text(word)
      val value = new IntWritable(1)
      context.write(key, value)
    }

  }
}

This is a rather naive implementation but it should be fine for out exercise. Verify the tests are passing by running sbt test.

Writing a reducer

Reducer is a function that takes one key and many values (outputs from the mapper) and returns zero or more key - value pairs.

The word count reducer will take a word as an input key and the value set will be as many ones (value 1) as many times the word occurred. The reducer has to count the ones to output the total occurrences for the given word.

A minimal reducer looks like this in src/main/scala/IntSumReducer.scala:

class IntSumReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
  private val result = new IntWritable

  override def reduce(key: Text,
                      values: Iterable[IntWritable],
                      context: Reducer[Text, IntWritable, Text, IntWritable]#Context) {
    // TODO: implement the reducer
  }
}

Add a unit test for the reducer

Writing a unit test for the reducer is very similar to how we did it for the mapper. Add these lines to src/test/IntSumReducerSpec.scala:

import org.apache.hadoop.io.{IntWritable, Text}
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mockito.MockitoSugar

import scala.collection.JavaConverters._

class IntSumReducerSpec extends FlatSpec with Matchers with MockitoSugar {
  "reduce" should "sum the values" in {
    val reducer = new IntSumReducer
    val context = mock[reducer.Context]

    reducer.reduce(
      key = new Text("one"),
      values = Seq(new IntWritable(1), new IntWritable(1)).asJava,
      context
    )

    verify(context).write(new Text("one"), new IntWritable(2))
  }
}

Explanation:

Run sbt test to make sure everything compiles and see the newly added test failing.

Implement the reducer

At this point we are ready to implement an IntSumReducer that passes the tests:

import java.lang.Iterable
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._
import scala.collection.JavaConverters._

class IntSumReducer
  extends Reducer[Text, IntWritable, Text, IntWritable] {

  override def reduce(key: Text,
                      values: Iterable[IntWritable],
                      context: Reducer[Text, IntWritable, Text, IntWritable]#Context) {

    val sum = values
      .asScala
      .map(_.get())
      .sum

    val value = new IntWritable(sum)
    context.write(key, value)
  }
}

Run sbt test to see all tests passing.

Running the MapReduce job

It is not a widely advertised feature but the Hadoop Java library allows executing MapReduce jobs locally without any Hadoop server running. This is a useful step towards learning how to come up with something that can later be deployed to a production environment and more importantly for our topic: it allows easy integration testing.

Running MapReduce jobs with or without a Hadoop cluster requires some glue code which acts both as an entry point (the main method of the runnable application) and the wiring that connects inputs with the mapper and reducer and dumps the output somewhere.

Let’s use Scala’s built-in App class to create a simple executable in src/main/scala/WordCount.scala:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

object WordCount extends App {
  val conf = new Configuration
  val job = Job.getInstance(conf, "Word Count")
  job.setJarByClass(classOf[TokenizerMapper])

  FileInputFormat.addInputPath(job, new Path("input"))

  job.setMapperClass(classOf[TokenizerMapper])
  job.setReducerClass(classOf[IntSumReducer])

  job.setOutputKeyClass(classOf[Text])
  job.setOutputValueClass(classOf[IntWritable])
  FileOutputFormat.setOutputPath(job, new Path("output"))

  job.waitForCompletion(true)
}

What happens here?

There is a lot more to learn about job setup, one important thing to know is the lack of compile time “compatibility checks”. If the input/output types do not match somewhere, the job will fail with an exception at some stage of running.

Running the job locally

Some test input and a logger configuration is missing before we can run the job locally. First create a few text files in a folder named input under the project root.

Add input/file1 with the following contents:

Hello World Bye World

Add input/file2 with the following contents:

Hello Hadoop Goodbye Hadoop

To enable verbose logging create src/main/resources/log4j.properties with the contents below. This helps tracking job progress and allows debugging issues.

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%nV

At this point you are ready to run the application with sbt 'runMain WordCount'. After a few seconds the job should exit with 0 status and write a text file named output/part-r-00000. If you check the contents of the file, it should look like this:

Bye     1
Goodbye 1
Hadoop  2
Hello   2
World   2

Note: the entire output folder has to be deleted before running the job again, otherwise it will fail.

Adding an integration test

The above setup is almost good enough for a basic integration test with the exception that it is not automated. Automating is fairly easy, just create src/test/scala/WordCountIntegrationSpec.scala with the contents below.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalatest._

import scala.io.Source

class WordCountIntegrationSpec
  extends FlatSpec
    with Matchers
    with BeforeAndAfterEach {

  "WordCount" should "write out word counts to output folder" in {
    WordCount.main(Array())

    val output = Source.fromFile("output/part-r-00000").mkString
    output should equal(
      """|Bye	1
         |Goodbye	1
         |Hadoop	2
         |Hello	2
         |World	2
         |""".stripMargin)
  }

  override def afterEach() = {
    val fs = FileSystem.get(new Configuration())
    fs.delete(new Path("output"), true)
  }

}

Explanation:

You should see the integration test passing when running sbt 'testOnly WordCountIntegrationSpec'.

Summary

From here on you should be able to extend these ideas to build and test more complicated real life MapReduce jobs. As a next step I recommend going in-depth with the official MapReduce Tutorial and continue from there to more detailed sections of the Hadoop documentation.

Packaging a MapReduce job into a jar file for running it on a real Hadoop cluster is beyond the scope of this post. If you are using Scala and sbt I recommend checking out sbt-assembly.

You can also find heavily commented code examples from above in this GitHub repo.