Getting Started with MapReduce the TDD Way
Writing and running the first lines of MapReduce code for Hadoop is an involved process that often turns beginners away. I will use the test driven development approach to give an introduction to writing MapReduce jobs and unit and integration tests without even installing a Hadoop server.
Requirements
I will use Scala and sbt for the code examples below because I like conciseness of the language but the approach should be easy to adopt to Java or any other JVM language.
To run the code examples the only thing you will have to install is sbt. I used version 1.0.2. Use your favorite terminal application to launch sbt commands.
You will also need a text editor or an IDE with built-in sbt support (I recommend IntelliJ IDEA with Scala plugin). The examples will assume you are using sbt from the command line, feel free to use your IDE’s equivalent of the sbt commands.
Although they do not have to be installed separately, this tutorial uses Scala 2.12.3 and Hadoop 2.8.1.
Introduction
Big data is a hot topic nowadays and Apache Hadoop is one of the popular software in use for storing and processing large data sets. There are many tutorials on how to get started with MapReduce - Hadoop’s underlying programming model - but not many of them serve the needs of the impatient developer. Moreover there is almost no time spent on testing a MapReduce job and Google search results often suggest using MRUnit, a now defunct and unsupported tool.
The official MapReduce Tutorial and many other learning material use the “word count problem” as an example - this tutorial will follow the tradition. The goal is to count the occurrence of the words in a bunch of text files.
While I was an engineer SoundCloud I had to learn Hadoop the hard way. Working with seasoned big data engineers meant I had access to a well maintained cluster and there was always someone to help out solving complicated problems but material or good advice on making your first steps did not exist. This experience inspired me to write this post.
The basics of MapReduce
To give you a basic understanding of what Hadoop and MapReduce are, these are the basics:
- Hadoop manages and stores data in sets of key value pairs both of which can be treated as certain types or simply as binary data.
- Mapper is a function that takes a key - value pair and returns zero or more key - value pairs.
- Reducer is a function that takes one key and many values (outputs from the mapper) and returns zero or more key - value pairs.
- Input and output key and value types might be different for both the mapper
and the reducer but there are a few important rules:
- The input types of the mapper must match the key - value type of the input data.
- The input types of the reducer must match the output types of the mapper.
Keep in mind there is a lot more beyond this explanation. This post will not discuss how Hadoop stores the data in a distributed manner neither how the mapper and reducer functions are executed. If you want to know more I recommend starting with the official Hadoop documentation and the MapReduce Tutorial.
Setting up a project
To start a new sbt project create an empty folder with a file named build.sbt
in it having the following contents:
name := "hadoop-mapreduce-tutorial"
version := "1.0"
scalaVersion := "2.12.3"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.8.1",
"org.scalatest" %% "scalatest" % "3.0.1" % Test,
"org.mockito" % "mockito-core" % "2.8.47" % Test
)
This will add the Hadoop Java library and the test frameworks as sbt-managed dependencies.
As you might have noticed I use ScalaTest test framework and the Mockito library for managing test doubles. Any other framework should do it, feel free to adopt the code to your favorite one.
Running sbt test
from the terminal should be successful and print something
like “No tests were executed”.
Writing a mapper
To repeat the basics, mapper is a function that takes a key - value pair and returns zero or more key - value pairs.
In our case the input values are all the lines in the text files, the mapper
will split these into words (tokenize). The output key will be the word itself
and the value always 1
meaning “this word occurred once”. The input keys are
ignored in this mapper.
Let’s create an incomplete implementation in src/main/scala/TokenizerMapper.scala
:
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._
class TokenizerMapper
extends Mapper[LongWritable, Text, Text, IntWritable] {
override def map(key: LongWritable,
value: Text,
context: Mapper[LongWritable, Text, Text, IntWritable]#Context) {
// TODO: implement the mapper
}
}
Add a unit test for the mapper
A simple unit test would look like this in src/test/TokenizerMapperSpec.scala
:
import org.apache.hadoop.io._
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mockito.MockitoSugar
class TokenizerMapperSpec extends FlatSpec with MockitoSugar {
"map" should "output the words split on spaces" in {
val mapper = new TokenizerMapper
val context = mock[mapper.Context]
mapper.map(
key = null,
value = new Text("foo bar foo"),
context
)
verify(context, times(2)).write(new Text("foo"), new IntWritable(1))
verify(context).write(new Text("bar"), new IntWritable(1))
}
}
This is what happens here:
- The test subject will be an instance of the mapper.
- The MapReduce framework provides a
Context
instance in production. In unit tests we will pass in a test double instead. - Invoke the mapper by calling
map()
with some test input and the context. - Ensure there were calls to context.write() with the given arguments (the
words and 1) using
verify()
.
Run sbt test
to see the test failing.
Implement the mapper
And now we are ready to implement a real mapper, let’s get back to
TokenizerMapper
. Change the contents to the following:
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._
class TokenizerMapper
extends Mapper[LongWritable, Text, Text, IntWritable] {
override def map(key: LongWritable,
value: Text,
context: Mapper[LongWritable, Text, Text, IntWritable]#Context) {
val words = value.toString.split("\\s+")
words.foreach { word =>
val key = new Text(word)
val value = new IntWritable(1)
context.write(key, value)
}
}
}
This is a rather naive implementation but it should be fine for out exercise.
Verify the tests are passing by running sbt test
.
Writing a reducer
Reducer is a function that takes one key and many values (outputs from the mapper) and returns zero or more key - value pairs.
The word count reducer will take a word as an input key and the value set will
be as many ones (value 1
) as many times the word occurred. The reducer has to
count the ones to output the total occurrences for the given word.
A minimal reducer looks like this in src/main/scala/IntSumReducer.scala
:
class IntSumReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
private val result = new IntWritable
override def reduce(key: Text,
values: Iterable[IntWritable],
context: Reducer[Text, IntWritable, Text, IntWritable]#Context) {
// TODO: implement the reducer
}
}
Add a unit test for the reducer
Writing a unit test for the reducer is very similar to how we did it for the
mapper. Add these lines to src/test/IntSumReducerSpec.scala
:
import org.apache.hadoop.io.{IntWritable, Text}
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mockito.MockitoSugar
import scala.collection.JavaConverters._
class IntSumReducerSpec extends FlatSpec with Matchers with MockitoSugar {
"reduce" should "sum the values" in {
val reducer = new IntSumReducer
val context = mock[reducer.Context]
reducer.reduce(
key = new Text("one"),
values = Seq(new IntWritable(1), new IntWritable(1)).asJava,
context
)
verify(context).write(new Text("one"), new IntWritable(2))
}
}
Explanation:
- The test subject will be an instance of the reducer.
- The MapReduce framework provides a
Context
instance in production. In unit tests we will pass in a test double instead. - Invoke the reducer by calling
reduce()
with some test input and the context. - Ensure there were calls to context.write() with the given arguments (the
words and 1) using
verify()
.
Run sbt test
to make sure everything compiles and see the newly added test
failing.
Implement the reducer
At this point we are ready to implement an IntSumReducer
that passes the tests:
import java.lang.Iterable
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._
import scala.collection.JavaConverters._
class IntSumReducer
extends Reducer[Text, IntWritable, Text, IntWritable] {
override def reduce(key: Text,
values: Iterable[IntWritable],
context: Reducer[Text, IntWritable, Text, IntWritable]#Context) {
val sum = values
.asScala
.map(_.get())
.sum
val value = new IntWritable(sum)
context.write(key, value)
}
}
Run sbt test
to see all tests passing.
Running the MapReduce job
It is not a widely advertised feature but the Hadoop Java library allows executing MapReduce jobs locally without any Hadoop server running. This is a useful step towards learning how to come up with something that can later be deployed to a production environment and more importantly for our topic: it allows easy integration testing.
Running MapReduce jobs with or without a Hadoop cluster requires some glue code
which acts both as an entry point (the main
method of the runnable
application) and the wiring that connects inputs with the mapper and reducer and
dumps the output somewhere.
Let’s use Scala’s built-in App
class to create a simple executable in
src/main/scala/WordCount.scala
:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
object WordCount extends App {
val conf = new Configuration
val job = Job.getInstance(conf, "Word Count")
job.setJarByClass(classOf[TokenizerMapper])
FileInputFormat.addInputPath(job, new Path("input"))
job.setMapperClass(classOf[TokenizerMapper])
job.setReducerClass(classOf[IntSumReducer])
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[IntWritable])
FileOutputFormat.setOutputPath(job, new Path("output"))
job.waitForCompletion(true)
}
What happens here?
- A
Job
instance is created - The job is configured with
input
as the input path (relative to the current working directory) - The mapper and the reducer are added to the job
- Output is set up to a folder named
output
with key and value types specified - The job is started and the application will not exit until the job completes or fails
There is a lot more to learn about job setup, one important thing to know is the lack of compile time “compatibility checks”. If the input/output types do not match somewhere, the job will fail with an exception at some stage of running.
Running the job locally
Some test input and a logger configuration is missing before we can run the job
locally. First create a few text files in a folder named input
under the project root.
Add input/file1
with the following contents:
Hello World Bye World
Add input/file2
with the following contents:
Hello Hadoop Goodbye Hadoop
To enable verbose logging create src/main/resources/log4j.properties
with
the contents below. This helps tracking job progress and allows debugging
issues.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%nV
At this point you are ready to run the application with sbt 'runMain
WordCount'
. After a few seconds the job should exit with 0 status and write a
text file named output/part-r-00000
. If you check the contents of the file, it
should look like this:
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
Note: the entire output
folder has to be deleted before running the job again,
otherwise it will fail.
Adding an integration test
The above setup is almost good enough for a basic integration test with the
exception that it is not automated. Automating is fairly easy, just create
src/test/scala/WordCountIntegrationSpec.scala
with the contents below.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalatest._
import scala.io.Source
class WordCountIntegrationSpec
extends FlatSpec
with Matchers
with BeforeAndAfterEach {
"WordCount" should "write out word counts to output folder" in {
WordCount.main(Array())
val output = Source.fromFile("output/part-r-00000").mkString
output should equal(
"""|Bye 1
|Goodbye 1
|Hadoop 2
|Hello 2
|World 2
|""".stripMargin)
}
override def afterEach() = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path("output"), true)
}
}
Explanation:
- The
main
method is invoked “manually” in the same process as the test runner. - Once the job finished, the output file is read to a string and compared against the expected contents.
- The test deletes the outputs after each run.
You should see the integration test passing when running sbt 'testOnly
WordCountIntegrationSpec'
.
Summary
From here on you should be able to extend these ideas to build and test more complicated real life MapReduce jobs. As a next step I recommend going in-depth with the official MapReduce Tutorial and continue from there to more detailed sections of the Hadoop documentation.
Packaging a MapReduce job into a jar file for running it on a real Hadoop cluster is beyond the scope of this post. If you are using Scala and sbt I recommend checking out sbt-assembly.
You can also find heavily commented code examples from above in this GitHub repo.