In this chapter we’re going to build upon what we learned last chapter about HDFS and the map-only portion of map/reduce and introduce a full map/reduce job and the mechanics of map/reduce. This time we’ll include both the shuffle/sort phase, and the reduce phase. Once again, we begin with a physical metaphor in the form of a story. After that we’ll walk you through building our first full-blown map/reduce job in Python. At the end of this chapter, you should have an intuitive understanding of how map/reduce works, including its map, shuffle/sort and reduce phases.
First, we begin with a metaphoric story… about how Chimpanzee and Elephant saved Christmas.
It was holiday time at the North Pole, and letters from little boys and little girls all over the world flooded in as they always do. But one year several years ago, the world had grown just a bit too much. The elves just could not keep up with the scale of requests — Christmas was in danger! Luckily, their friends at the Elephant & Chimpanzee Corporation were available to help. Packing their typewriters and good winter coats, JT, Nanette and the crew headed to the Santaplex at the North Pole. here’s what they found.
As you know, each year children from every corner of the earth write to Santa to request toys, and Santa — knowing who’s been naughty and who’s been nice — strives to meet the wishes of every good little boy and girl who writes him. He employs a regular army of toymaker elves, each of whom specializes in certain kinds of toy: some elves make Action Figures and Dolls, others make Xylophones and Yo-Yos.
Under the elves' old system, as bags of mail arrived they were examined by an elven postal clerk and then hung from the branches of the Big Tree at the center of the Santaplex. Letters were organized on the tree according to the child’s town, as the shipping department has a critical need to organize toys by their final delivery schedule. But the toymaker elves must know what toys to make as well, and so for each letter a postal clerk recorded its Big Tree coordinates in a ledger that was organized by type of toy.
So to retrieve a letter, a doll-making elf would look under "Doll" in the ledger to find the next letter’s coordinates, then wait as teamster elves swung a big claw arm to retrieve it from the Big Tree. As JT readily observed, the mail couldn’t be organized both by toy type and also by delivery location, and so this ledger system was a necessary evil."The next request for Lego is as likely to be from Cucamonga as from Novosibirsk, and letters can’t be pulled from the tree any faster than the crane arm can move!"
What’s worse, the size of Santa’s operation meant that the workbenches were very far from where letters came in. The hallways were clogged with frazzled elves running from Big Tree to workbench and back, spending as much effort requesting and retrieving letters as they did making toys. This complex transactional system was a bottleneck in toy making, and mechanic elves were constantly scheming ways to make the claw arm cope with increased load. "Throughput, not Latency!" trumpeted Nanette. "For hauling heavy loads, you need a stately elephant parade, not a swarm of frazzled elves!"
In marched Chimpanzee and Elephant, Inc, and set up a finite number of chimpanzees at a finite number of typewriters, each with an elephant desk-mate. Strangely, the C&E solution to the too-many-letters problem involved producing more paper. The problem wasn’t in the amount of paper, it was in all the work being done to service the paper. In the new world, all the rules for handling documents are simple, uniform and local.
Postal clerks still stored each letter on the Big Tree (allowing the legacy shipping system to continue unchanged), but now also handed off bags holding copies of the mail. As she did with the translation passages, Nanette distributed these mailbags across the desks just as they arrived. The overhead of recording each letter in the much-hated ledger was no more, and the hallways were no longer clogged with elves racing to and fro.
The chimps' job was to take letters one after another from a mailbag, and fill out a toy-form for each request. A toy-form has a prominent label showing the type of toy, and a body with all the information you’d expect: Name, Nice/Naughty Status, Location, and so forth. You can see some examples here:
Deer SANTA I wood like a doll for me and and an optimus prime robot for my brother joe I have been good this year love julia # Good kids, generates a toy for Julia and a toy for her brother # Toy Forms: # doll | type="green hair" recipient="Joe's sister Julia" # robot | type="optimus prime" recipient="Joe" Greetings to you Mr Claus, I came to know of you in my search for a reliable and reputable person to handle a very confidential business transaction, which involves the transfer of a large sum of money... # Spam # (no toy forms) HEY SANTA I WANT A YANKEES HAT AND NOT ANY DUMB BOOKS THIS YEAR FRANK # Frank is a jerk. He will get a lump of coal. # Toy Forms: # coal | type="anthracite" recipient="Frank" reason="doesn't like to read"
The first note, from a very good girl who is thoughtful for her brother, creates two toyforms: one for Joe’s robot and one for Julia’s doll. The second note is spam, so it creates no toyforms. The third one yields a toyform directing Santa to put coal in Frank’s stocking.
Processing letters in this way represents the map phase of a map/reduce job. The work performed in a map phase could be anything: translation, letter processing, or any other operation. For each record read in the map phase, a mapper can produce zero, one or more records. In this case each letter produces one or more toy forms. This elf-driven letter operation turns unstructured data (a letter) into a structured record (toy form).
Next we move on to the shuffle/sort phase, which uses the letters as input.
Here’s the new wrinkle on top of the system used in the translation project. Next to every desk now stood a line of pygmy elephants, dressed in capes listing the types of toy it would deliver. Each desk had a pygmy elephant for Archery Kits and Dolls, another one for Xylophones and Yo-Yos, and so forth — matching the different specialties of toymaker elves.
As the chimpanzees would work through a mail bag, they’d place each toy-form into the basket on the back of the pygmy elephant that matched its type. At the completion of a bag (a map phase), the current line of elephants would 'shuffle' off to the workbenches, and behind them a new line of elephants would trundle into place. What fun!
Finally, the pygmy elephants would march through the now-quiet hallways to the toy shop floor, each reporting to the workbench that matched its toy types. So the Archery Kit/Doll workbench had a line of pygmy elephants, one for every Chimpanzee & Elephant desk; similarly the Xylophone/Yo-Yo workbench, and all the rest.
This activity by the Pygmy Elephants represents the shuffle/sort phase of a map/reduce job, in which records produced by mappers are grouped by their key (the toy type) and delivered to a reducer for that key (an elf!).
Now the reduce phase begins: Toymaker elves begin producing a steady stream of toys, no longer constrained by the overhead of walking the hallway and waiting for Big-Tree retrieval on every toy.
Our map/reduce job is complete, and toy-making is back on track!
Having previously introduced 'map only Hadoop' in our first story, in this story, we introduced the shuffle/sort and reduce operations of Hadoop MapReduce. The toymaker elves are the reducers - they receive all the mapped records (toy forms) corresponding to one or more group keys - the type of toy. The act of toy-making is the reduce operation. The pygmy elephants with are the shuffle/sort - the movement of data from mappers to reducers. That is how the MapReduce paradigm works! This simple abstraction powers Hadoop MapReduce programs. It is the simplicity of the scheme that makes it so powerful.
In the previous chapter, you worked with the simple-as-possible Python script, which let you learn the mechanics of running Hadoop jobs, and understand the essentials of the HDFS. Document translation is an example of an "embarrassingly parallel" problem: each record could be processed individually, just as they were organized in the source files. This was a 'map-only' job, an operation we’ll discuss more in section two.
Hadoop’s real power comes from the ability to process data in context, using what’s known as the Map/Reduce paradigm. Every map/reduce job is a program with the same three phases: map, shuffle/sort, and reduce. In the map phase, your program processes its input in any way you see fit, emitting labelled output records. Between map and reduce is the Hadoop shuffle/sort. In the shuffle/sort phase, Hadoop groups and sorts the mapped records according to their labels. Finally, in the reduce phase, your program processes each sorted, labeled group and Hadoop stores its output on HDFS. That shuffle, or 'grouping-by-label' part is where the magic lies: it ensures that no matter where the relevant records started, they arrive at the same place at a reducer in a predictable manner, ready to be synthesized.
If Map/Reduce is the core of Hadoop’s operation, then getting to think in Map/Reduce terms is the key to effectively using Hadoop. In turn, thinking in Map/Reduce requires that you develop an innate, physical sense of how Hadoop moves data around. You can’t understand the fundamental patterns of data analysis in Hadoop — grouping, filtering, joining records, and so forth — without knowing the basics. Having read the Christmas story and explanation, you should now have an intuitive understanding of how Hadoop and map/reduce work. If you’re still confused, re-read the beginning of this chapter again until you master the material on an intuitive level.
Santa Claus and his elves are busy year-round, but outside the holiday season Santa’s flying reindeer do not have many responsibilities. As flying objects themselves, they spend a good part of their multi-month break pursuing their favorite hobby: UFOlogy (the study of Unidentified Flying Objects and the search for extraterrestrial civilization). So you can imagine how excited they were to learn about the data set of more than 60,000 documented UFO sightings we worked with in the first chapter.
Sixty thousand sightings is much higher than a reindeer can count (only four hooves!), so JT and Nanette occasionally earn a little good favor from Santa Claus by helping the reindeer answer questions about the UFO data. We can do our part by helping our reindeer friends understand how long people wait to report UFOs.
The UFO data is located on the docker HDFS we setup last chapter. Lets begin by checking our input data. SSH into the gateway node and run this command to see the top 5 lines of the ufo sightings sample:
cat /data/gold/geo/ufo_sightings/ufo_sightings-sample.tsv|head -5
Note that 'gold' in this path stands for 'gold standard data' - or in other words, 'data that has been checked and validated to be correct.'
The UFO data is in 'TSV' format - Tab Separated Values.
1995-10-09T05:00:00Z 1995-10-09T05:00:00Z Iowa City, IA Man repts. witnessing "flash, ... 1995-10-10T05:00:00Z 1995-10-11T05:00:00Z Milwaukee, WI 2 min. Man on Hwy 43 SW of Mil... 1995-01-01T06:00:00Z 1995-01-03T06:00:00Z Shelton, WA Telephoned Report:CA woman visit... 1995-05-10T05:00:00Z 1995-05-10T05:00:00Z Columbia, MO 2 min. Man repts. son's bizarre... 1995-06-11T05:00:00Z 1995-06-14T05:00:00Z Seattle, WA Anonymous caller repts. sighting...
In the Chimpanzee & Elephant world, a chimp had the following role:
-
Read and understand each letter
-
Create a new intermediate item having a label (the type of toy, a key) and information about the toy (the work order, a value)
-
Hand it to the elephant which delivers to that toy’s workbench
We’re going to write a Hadoop mapper which performs a similar purpose:
-
Reads the raw data and parses it into a structured record
-
Creates a new intermediate item having a label (the number of days delay before reporting a UFO, a key) and a count (one sighting for each input record, a value).
-
Hands it to Hadoop for delivery to that label/group’s reducer
In order to calculate the time delay in reporting UFOs, we’ve got to determine that delay by subtracting the time the UFO was sighted from the time the UFO was reported. As above, this occurs in the map phase of our map/reduce job. The mapper emits the time delay in days, and a counter - that is always one.
You may need to install the iso8601
library, via:
pip install iso8601
The mapper code in Python looks like this:
#!/usr/bin/python # Example MapReduce job: count ufo sightings by location. import sys, re, time, iso8601 # You can get iso8601 from https://pypi.python.org/pypi/iso8601 # Pull out city/state from ex: Town, ST word_finder = re.compile("([\w\s]+),\s(\w+)") # Loop through each line from standard input for line in sys.stdin: # Remove the carriage return, and split on tabs - maximum of 3 fields fields = line.rstrip("\n").split("\t", 2) try: # Parse the two dates, then find the time between them sighted_at, reported_at, rest = fields sighted_dt = iso8601.parse_date(sighted_at) reported_dt = iso8601.parse_date(reported_at) diff = reported_dt - sighted_dt except: sys.stderr.write("Bad line: {}".format(line)) continue # Emit the number of days and one print "\t".join((str(diff.days), "1"))
You can test the mapper like this:
cat /data/gold/geo/ufo_sightings/ufo_sightings-sample.tsv | python examples/ch_02/ufo_mapper.py
The intermediate output looks like this:
0 1 889 1 346 1 1294 1 12 1 14689 1 12 1 ...
These are the records our reducer will receive as input. Just as the pygmy elephants transported work orders to elves' workbenches, Hadoop delivers each mapped record to the reducer, the second stage of our job.
In our previous example, the elf at each workbench saw a series of work orders, with the guarantee that a) work orders for each toy type are delivered together and in order; and b) this was the only workbench to receive work orders for that toy type. Similarly, in this job the reducer receives a series of records (UFO reports, values), grouped by label (the number of days delay, a key), with a guarantee that it is the unique processor for such records.
Our reducer is tasked with creating a histogram. The reducer is thus concerned with grouping like time delays together. The reduce key in this case is the number of days delay - for instance 0, 1, 10 or 35 days. In the reducer, we’re keeping count; the count for each element of the reduce key/group is incremented by the count (1) as each record is processed. Because Hadoop guarantees that all reduce keys of one value go to one reducer, we can extrapolate that if the reduce key changes - then we are done with the previous group and reduce key. Being done with the previous group, it is time to emit our record about that group: in this case the reduce key itself and the sum of counts of values for that reduce key. And so our histogram is populated with 'reduced' values.
Note that in this example, to sort is to group. Take a moment and re-read the last paragraph, if necessary. This is the magic of map/reduce: when you perform a sort on a set of values, you are implicitly grouping like records together. MapReduce algorithms take advantage of this implicit grouping, making it explicit via APIs.
Moving on, our reducer looks like this:
#!/usr/bin/python """Example MapReduce job: count ufo sightings by hour. Based on example at http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ """ import sys, re current_days = None curreent_count = 0 days = None # Loop through each line from standard input for line in sys.stdin: # split the line into two values, using the tab character days, count = line.rstrip("\n").split("\t", 1) # Streaming always reads strings, so must convert to integer try: count = int(count) except: sys.stderr.write("Can't convert '{}' to integer\n".format(count)) continue # If sorted input key is the same, increment counter if current_days == days: current_count += count # If the key has changed... else: # This is a new reduce key, so emit the total of the last key if current_days: print "{}\t{}".format(current_days, current_count) # And set the new key and count to the new reduce key/reset total current_count = count current_days = days # Emit the last reduce key if current_days == days: print "{}\t{}".format(current_days, current_count)
Always test locally on a sample of data, if at all possible:
cat /data/gold/geo/ufo_sightings/ufo_sightings-sample.tsv | python examples/ch_02/ufo_mapper.py | \ sort | python examples/ch_02/ufo_reducer.py|sort -n
Note that we’ve added a sort -n
to the end of the commands - to show the lowest values first. On Hadoop, this would take another map/reduce job.
The output looks like this:
-1 3 0 51 1 17 2 9 3 4 4 4 5 2 6 1 10 1 15 1 30 2 57 1 74 1 115 1 179 1 203 1
This command demonstrates an execution pattern for testing map/reduce code, and it goes like this:
cat /path/to/data/file | mapper | sort | reducer
Being able to test map/reduce code locally is important because Hadoop is a batch system. In other words, Hadoop is 'slow.' Thats a relative term - because a large Hadoop cluster is blazingly fast at processing terabytes and even petabytes of data. However, the shortest Hadoop job on a loaded cluster can take a few minutes, which can make debugging a slow and cumbersome process. The ability to bypass this several-minute wait by running locally on a sample of data is essential to being productive as a Hadoop developer or analyst.
Now that we’ve tested locally, we’re ready to execute our map/reduce job on Hadoop using Hadoop Streaming. Hadoop Streaming is a utility which lets users run jobs with any executable program as the mapper and/or the reducer. You can use Python scripts, or even simple shell commands like wc
or others. If you’re writing a Python, Ruby, Perl or other dynamic language script as a mapper or reducer, be sure to make the script executable, or the hadoop job will fail.
The streaming command to run our Python mapper and reducer looks like this:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -Dmapreduce.cluster.local.dir=/home/chimpy/code -fs local -jt local -files examples/ch_02/ufo_mapper.py,examples/ch_02/ufo_reducer.py -mapper ufo_mapper.py -reducer ufo_reducer.py -input /data/gold/geo/ufo_sightings/ufo_sightings-sample.tsv -output ./ufo.out
You’ll see output similar to that you saw in the last chapter. When the job is complete, view the results:
cat ./ufo.out/* | sort -n
The results should be identical to the output of the local execution:
-1 3 0 51 1 17 2 9 3 4 4 4 5 2 6 1 10 1 15 1 30 2 57 1 74 1 115 1 179 1 203 1
While the results are identical, the potentials vary. The difference between the local and Hadoop runs being that the Hadoop execution on a large cluster could scale to petabytes of UFO sightings! Note that there are some negative values - imperfections in our data that we may need to filter out before visualizing our results. 'Big Data' often contains such surprises.
When people (or Reindeer) work with data, their end goal is to uncover some answer or pattern. They most often employ Hadoop to turn Big Data into small data, then use traditional analytics techniques to turn small data into answers and insight. One such technique is to plot the information. If a picture is worth a thousand words, then even a basic data plot is worth reams of statistical analysis.
That’s because the human eye often gets a rough idea of a pattern faster than people can write code to divine the proper mathematical result. A few lines of Python can create a histogram to present to our Reindeer pals, to give a gestalt sense of UFO reporting delays.
To create a histogram chart, we’ll run a Python script on our docker gateway:
#!/usr/bin/python # Example histogram: UFO reporting delay by day import numpy as np import matplotlib.pyplot as plt day_labels = [] counts = [] file = open("ufo_hist.tsv") for line in file: fields = line.rstrip("\n").split("\t", 1) days, count = fields day_labels.append(int(days)) counts.append(int(count)) plt.title("UFO Reporting Delays") plt.bar(day_labels, counts) plt.savefig("UFO_Reporting_Delays.png")
To view the chart, we need to get the image back on your local machine, and then open it:
scp -i insecure_key.pem -P 9022 chimpy@$DOCKER_IP:UFO_Reporting_Delays.png . # Enter password 'chimpy' open UFO_Reporting_Delays.png
The chart looks like this:
While we’ve covered the basic operation of Hadoop MapReduce jobs on a Hadoop cluster, it is worth taking a moment to reflect on how operating Hadoop differs from operating a traditional relational database. Hadoop is not a database.
Fundamentally, the storage engine at the heart of a traditional relational database does two things: it holds all the records, and it maintains a set of indexes for lookups and other operations (the crane arm in Santa’s legacy system). To retrieve a record, it must consult the appropriate index to find the location of the record, then load it from the disk. This is very fast for record-by-record retrieval, but becomes cripplingly inefficient for general high-throughput access. If the records are stored by location and arrival time (as the mailbags were on the Big Tree), then there is no "locality of access" for records retrieved by, say, type of toy — records for Lego will be spread all across the disk. With traditional drives, the disk’s read head has to physically swing back and forth in a frenzy across the drive platter, and though the newer flash drives have smaller retrieval latency it’s still far too high for bulk operations.
What’s more, traditional database applications lend themselves very well to low-latency operations (such as rendering a webpage showing the toys you requested), but very poorly to high-throughput operations (such as requesting every single doll order in sequence). Unless you invest specific expertise and effort, you have little ability to organize requests for efficient retrieval. You either suffer a variety of non-locality and congestion based inefficiencies, or wind up with an application that caters to the database more than to its users. You can to a certain extent use the laws of economics to bend the laws of physics — as the commercial success of Oracle and Netezza show — but the finiteness of time, space and memory present an insoluble scaling problem for traditional databases.
Hadoop solves the scaling problem by not solving the data organization problem. Rather than insist that the data be organized and indexed as it’s written to disk, catering to every context that could be requested, Hadoop instead focuses purely on the throughput case.
As you recall, the bargain that Map/Reduce proposes is that you agree to only write programs fitting this Haiku:
data flutters by elephants make sturdy piles context yields insight
More prosaically, we might explain map/reduce in three phases:
description | phase | explanation |
---|---|---|
process and label |
map |
turn each input record into any number of labelled records |
sorted context groups |
group-sort |
Hadoop groups those records uniquely under each label, in a sorted order. (You’ll see this also called the shuffle/sort phase) |
synthesize (process context groups) |
reduce |
for each group, process its records in order; emit anything you want. |
The trick lies in the shuffle or 'group-sort' phase: assigning the same label to two records in the map phase ensures that they will become local in the reduce phase.
The records in stage 1 ('label') are out of context. The mappers see each record exactly once, but with no promises as to order, and no promises as to which mapper sees which record. We’ve 'moved the compute to the data', allowing each process to work quietly on the data in its work space. Over at C&E Corp, letters and translation passages aren’t pre-organized and they don’t have to be; J.T. and Nanette care about keeping all the chimps working steadily and keeping the hallways clear of inter-office document requests.
Once the map attempt finishes, each 'partition' (the collection of records destined for a common reducer, with a common label, or key) is dispatched to the corresponding machine, and the mapper is free to start a new task. If you notice, the only time data moves from one machine to another is when the intermediate piles of data get shipped. Instead of an exhausted crane arm, we now have a dignified elephant parade, conducted in concert with the efforts of our diligent workers.
Digging a little deeper into the mechanics of it all, a mapper receives one record at a time. By default, Hadoop works on text files, and a record is one line of text. Caveat - Hadoop actually supports other file formats and other types of storage beside files, but for the most part the examples in this book will focus on processing files on disk in a readable text format. The whole point of the mapper is to "label" the record so that the group-sort phase can track records with the same label.
Hadoop feeds the mapper that one record, and in turn, the mapper spits out one or more labelled records. Usually the values in each record’s fields are some combination of the values in the input record and a simple transformation of those values. But the output is allowed to be anything — the entire record, some subset of fields, the phase of the moon, the contents of a web page, nothing, … — and at times we’ll solve important problems by pushing that point. The mapper can output those records in any order, at any time in its lifecycle, each with any label.
In the shuffle/group-sort phase, Hadoop transfers all the map output records in a partition to the corresponding reducer. That reducer merges the records it receives from all mappers, so that each group contains all records for its label regardless of what machine it came from. What’s nice about the group-sort phase is that you don’t have to do anything for it. Hadoop takes care of moving the data around for you. What’s less nice about the group-sort phase is that it is typically the performance bottleneck. Later, we’ll learn how to take care of Hadoop so that it can move the data around smartly.
Whereas the mapper sees single records in isolation, a reducer receives one key (the label) and all records that match that key. In other words, a reducer operates on a group of related records. Just as with the mapper, as long as it keeps eating records and doesn’t fail the reducer can do anything with those records it pleases and emit anything it wants. It can emit nothing, it can contact a remote database, it can emit nothing until the very end and then emit one or a zillion records. The output can be text, it can be video files, it can be angry letters to the President. They don’t have to be labelled, and they don’t have to make sense. Having said all that, usually what a reducer emits are nice well-formed records resulting from sensible transformations of its input, like the count of records, the largest or smallest value from a field, or full records paired with other records. And though there’s no explicit notion of a label attached to a reducer output record, it’s pretty common that within the record’s fields are values that future mappers will use to form labels.
Once you understand the label-group-process data flow we’ve just introduced, you understand enough about map/reduce to reason about the large-scale motion of data and thus your job’s performance.
You’ve just seen how records move through a map/reduce workflow, both in theory and in practice. This can be challenging material to grasp, so don’t feel bad if you don’t get all of it right away. While we did our best to simplify complex phenomenon, we hope we’ve still communicated the essentials. It is normal to have to re-read this chapter until you get it straight. You may also try re-visiting this chapter once you’ve read a bit further in the book. Once you’ve performed a few Pig `GROUP BY`s, this material may feel more natural.
You should now have an intuitive sense of the mechanics behind map/reduce. Remember and come back to this chapter as you read the rest of the book. This will aide you in acquiring a deep understanding of the operations that make up the strategies and tactics of the analytic toolkit. By the end of the book, you’ll be converting Pig syntax into map/reduce jobs in your head! You’ll be able to reason about the cost of different operations and optimize your Pig scripts accordingly.
That covers Map/Reduce for now (don’t worry, we’ll revisit map/reduce in the chapter on Joins). Next, we’ll introduce you to the dataset we’ll be working on: baseball! Then we’ll introduce Apache Pig, a high level language and tool which will generate MapReduce jobs for you. Having covered all that, we can move on to learning analytic patterns in Pig in Part 2 of the book.