After a day or two of the new toyform process, Mrs. Claus reported dismaying news. Even though productivity was much improved over the Big-Tree system, it wasn’t going to be enough to hit the Christmas deadline.
The problem was plain to see. Repeatedly throughout the day, workbenches would run out of parts for the toys they were making. The dramatically-improved efficiency of order handling, and the large built-up backlog of orders, far outstripped what the toy parts warehouse could supply. Various workbenches were clogged with Jack-in-the-boxes awaiting springs, number blocks awaiting paint and the like. Tempers were running high, and the hallways became clogged again with overloaded parts carts careening off each other. JT and Nanette filled several whiteboards with proposed schemes, but none of them felt right.
To clear his mind, JT wandered over to the reindeer ready room, eager to join in the cutthroat games of poker Rudolph and his pals regularly ran. During a break in the action, JT found himself idly sorting out the deck of cards by number, to check that none of his Reindeer friends slipped an extra ace or three into the deck. As he did so, something in his mind flashed back to the unfinished toys on the assembly floor: mounds of number blocks, stacks of Jack-in-the-boxes, rows of dolls. Sorting the cards by number had naturally organized them into groups by kind as well: he saw all the numbers in blocks in a run, followed by all the jacks, then the queens and the kings and the aces.
"Sorting is equivalent to grouping!" he exclaimed to the reindeers' puzzlement. "Sorry, fellas, you’ll have to deal me out," he said, as he ran off to find Nanette.
The next day, they made several changes to the toy-making workflow. First, they set up a delegation of elvish parts clerks at desks behind the letter-writing chimpanzees, directing the chimps to hand a carbon copy of each toy form to a parts clerk as well. On receipt of a toy form, each parts clerk would write out a set of tickets, one for each part in that toy, and note on the ticket the ID of its toyform. These tickets were then dispatched by pygmy elephant to the corresponding section of the parts warehouse to be retrieved from the shelves.
Now, here is the truly ingenious part that JT struck upon that night. Before, the chimpanzees placed their toy forms onto the back of each pygmy elephant in no particular order. JT replaced these baskets with standing file folders — the kind you might see on an organized person’s desk. He directed the chimpanzees to insert each toy form into the file folder according to the alphabetical order of its ID. (Chimpanzees are exceedingly dextrous, so this did not appreciably impact their speed.) Meanwhile, at the parts warehouse Nanette directed a crew of elvish carpenters to add a clever set of movable set of frames to each of the part carts. Similarly, our pachydermous proprietor prompted the parts pickers to put each part-cart’s picked parts in the place that properly preserved the procession of their toyform IDs.
After a double shift that night by the parts department and the chimpanzees, the toymakers arrived in the morning to find, next to each workbench, the pygmy elephants with their toy forms and a set of carts from each warehouse section holding the parts they’d need. As work proceeded, a sense of joy and relief soon spread across the shop.
The elves were now producing a steady stream of toys as fast as their hammers could fly, with an economy of motion they’d never experienced. Since both the parts and the toy forms were in the same order by toyform ID, as the toymakers would pull the next toy form from the file they would always find the parts for it first at hand. Get the toy form for a wooden toy train and you would find a train chassis next in the chassis cart, small wooden wheels next in the wheel cart, and magnetic bumpers next in the small parts cart. Get the toy form for a rolling duck on a string, and you would find instead, a duck chassis, large wooden wheels and a length of string at the head of their respective carts.
Not only did work now proceed with an unbroken swing, but the previously cluttered workbenches were now clear — their only contents were the parts immediately required to assemble the next toy. This space efficiency let Santa pull in extra temporary workers from the elves' Rivendale branch, who were bored with fighting orcs and excited to help out.
Toys were soon coming off the line at a tremendous pace, far exceeding what the elves had ever been able to achieve. By the second day of the new system, Mrs. Claus excitedly reported the news everyone was hoping to hear: they were fully on track to hit the Christmas Eve deadline!
And that’s the story of how Elephant and Chimpanzee saved Christmas.
In the last problem we solved for our Reindeer friends, we only cared that the data came to the reducer in groups. We had no concerns about which reducers handled which groups, and we had no concerns about how the data was organized within the group. The next example will draw on the full scope of the framework, equipping you to understand the complete contract that Hadoop provides the end user.
Since our reindeer friends want to spend their summer months visiting the locations of various UFO sighting, they would like more information to help plan their trip. The Geonames dataset (REF) provides more than seven million well-described points of interest, so we can extend each UFO sighting whose location matches a populated place name with its longitude, latitude, population and more.
Your authors have additionally run the free-text locations — "Merrimac, WI" or "Newark, NJ (South of Garden State Pkwy)" — through a geolocation service to (where possible) add structured geographic information: longitude, latitude and so forth.
When you are writing a Map/Reduce job, the first critical question is how to group the records in context for the Reducer to synthesize. In this case, we want to match every UFO sighting against the corresponding Geonames record with the same city, state and country, so the Mapper labels each record with those three fields. This ensures records with the same location name all are received by a single Reducer in a single group, just as we saw with toys sent to the same workbench or visits "sent" to the same time bucket. The Reducer will also need to know which records are sightings and which records are places, so we have extended the label with an "A" for places and a "B" for sightings. (You will see in a moment why we chose those letters.) While we are at it, we will also eliminate Geonames records that are not populated places.
class UfoSighting include Wu::Model field :sighted_at, Time field :reported_at, Time field :shape, Symbol field :city, String field :state, String field :country, String field :duration_str, String field :location_str, String # field :longitude, Float field :latitude, Float field :city, String field :region, String field :country, String field :population, Integer field :quadkey, String # field :description, String end
An elf building a toy first selected the toy form, then selected each of the appropriate parts. To facilitate this, the elephants carrying toy forms stood at the head of the workbench next to all the parts carts. While the first part of the label (the partition key) defines how records are grouped, the remainder of the label (the sort key) describes how they are ordered within the group. Denoting places with an "A" and sightings with a "B" ensures our Reducer always first receives the place for a given location name followed by the sightings. For each group, the Reducer holds the place record in a temporary variable and appends the places fields to those of each sighting that follows. In the happy case where a group holds both place and sightings, the Reducer iterates over each sighting. There are many places that match no UFO sightings; these are discarded. There are some UFO sightings without reconcilable location data; we will hold onto those but leave the place fields blank. Even if these groups had been extremely large, this matching required no more memory overhead than the size of a place record.
Now that you’ve seen the partition, sort and secondary sort in action, it’s time to attach more formal and technical detail to how it works.
As we mentioned in the opening, the fundamental challenge of Big Data is how to put records into relevant context, even when it is distributed in a highly non-local fashion. Traditional databases and high-performance computing approaches use a diverse set of methods and high-cost hardware to brute-force the problem but at some point, the joint laws of physics and economics win out. Hadoop, instead, gives you exactly and only one "locality" primitive — only one way to express which records should be grouped in context — namely, partition-group-sort -'ing the records by their label. The sidebar (REF) about the Hadoop contract describes the precise properties of this operation but here is a less formal explanation of its essential behavior.
The partition key portion of the label governs how records are assigned to Reducers; it is analogous to the tear-sheet that mapped which toy types went to which workbench. Just as there was only one workbench for dolls and one workbench for ponies, each partition maps to exactly one Reducer. Since there are generally a small number of Reducers and an arbitrary number of partitions, each Reducer will typically see many partitions.
The default partitioner (HashPartitioner
) assigns partitions to Reducers arbitrarily, in order to give a reasonably uniform distribution of records to Reducers. It does not know anything specific about your data, though, so you could get unlucky and find that you have sent all the tweets by Justin Bieber and Lady Gaga to the same Reducer or all the census forms for New York, L.A. and Chicago to the same Reducer, leaving it with an unfairly large portion of the midstream data. If the partitions themselves would be manageable and you are simply unlucky as to which became neighbors, just try using one fewer Reduce slots — this will break up the mapping into a different set of neighbors.
For a given cluster with a given number of Reduce slots, the assignment of partitions by the hash Reducer will be stable from run to run, but you should not count on it any more than that.
The naive HashPartitioner
would not work for the elves, we assume — you don’t want the toyforms for ponies to be handled by the same workbench processing toyforms for pocketwatches. For us too, some operations require a specific partitioning scheme (as you will see when we describe the total sort operation (REF)), and so Hadoop allows you to specify your own partitioner. But this is rarely necessary, and in fact your authors have gone their whole careers without ever writing on. If you find yourself considering writing a custom partitioner, stop to consider whether you are going against the grain of Hadoop’s framework. Hadoop knows what to do with your data and, typically, the fewer constraints you place on its operation, the better it can serve you.
The group key governs, well, the actual groups your program sees. All the records within a group arrive together — once you see a record from one group, you will see all of them in a row and you will never again see a record from a preceding group.
Within the group, the records are sent in the order given by the sort key. When you are using the Hadoop streaming interface (the basis for Wukong, MrJobs and the like), the only datatype is text, and so records are sorted lexicographically by their UTF-8 characters. (TECHREVIEW: is it UTF-8 or binary strings?)
This means that:
-
Zoo
comes afterApple
, becauseA
comes beforeZ
-
Zoo
comes beforeapple
, because upper-case characters precede lower-case characters -
12345
comes before42
, and both of them come beforeApple
,Zoo
orapple
-
12345
comes after ` 42` because we used spaces to pad out the number 42 to five characters. -
apple
andzoo
come beforeшимпанзе
, because the basic ASCII-like characters (like the ones on a US keyboard) precede extended unicode-like characters (like the russian characters in the word for "chimpanzee"). -
#
(hash marks) come beforeApple
andzoo
; and||||
(pipes) comes after all of them. Remember these characters — they are is useful for forcing a set of records to the top or bottom of your input, a trick we’ll use in the geodata chapter (REF). The dot (.
), hyphen (-
), plus (+
) hash (#
) come near the start of the 7-bit ASCII alphanumeric set. The tilde (~
), pipe (|
) come at the end. All of them precede extended-character words likeшимпанзе
.
Note
|
Beware the Derp-Sort
It’s very important to recognize that numbers are not sorted by their numeric value unless you have control over their Java type. The simplest way to get numeric sorting of positive numbers is to pad numeric outputs a constant width by prepended spaces. In Ruby, the expression %10d" % val produces an ten-character wide string (wide enough for all positive thirty-two bit numbers). There’s no good way in basic Hadoop Streaming to get negative numbers to sort properly — yes, this is very annoying. (TECHREVIEW: is there a good way?)
|
In the common case, the partition key, group key and sort key are the same, because all you care is that records are grouped. But of course it’s also common to have the three keys not be the same. The prior example, (REF) a JOIN of two tables, demonstrated a common pattern for use of the secondary sort; and the roll-up aggregation example that follows illustrates both a secondary sort and a larger partition key than group key.
The set defined by the partition key must be identical or a superset of the sets defined by the group key, or your groups will be meaningless. Hadoop doesn’t impose that constraint on you, so just be sure to think at least once. The easiest way to do this (and the way we almost always to this) is to have the partition key be the same as or an extension of the group key, and the sort key be the same as or an extension of the group key.
It is very important to get a good grasp of how the partition and group keys relate, so let’s step through an exercise illustrating their influence on the distribution of records.
Here’s another version of the script to total wikipedia pageviews. We’ve modified the mapper to emit separate fields for the century, year, month, day and hour (you wouldn’t normally do this; we’re trying to prove a point). The reducer intends to aggregate the total pageviews across all pages by year and month: a count for December 2010, for January 2011, and so forth. We’ve also directed it to use twenty reducers, enough to illustrate a balanced distribution of reducer data.
Run the script on the subuniverse pageview data with --partition_keys=3 --sort_keys=3
(CODE check params), and you’ll see it use the first three keys (century/year/month) as both partition keys and sort keys. Each reducer’s output will tend to have months spread across all the years in the sample, and the data will be fairly evenly distributed across all the reducers. In our runs, the -00000
file held the months of (CODE insert observed months), while the -00001
file held the months of (CODE insert observed months); all the files were close to (CODE size) MB large. (CODE consider updating to "1,2,3" syntax, perhaps with a gratuitous randomizing field as well. If not, make sure wukong errors on a partition_keys larger than the sort_keys). Running with --partition_keys=3 --sort_keys=4
doesn’t change anything: the get_key
method in this particular reducer only pays attention to the century/year/month, so the ordering within the month is irrelevant.
Running it instead with --partition_keys=2 --sort_keys=3
tells Hadoop to partition on the century/year, but do a secondary sort on the month as well. All records that share a century and year now go to the same reducer, while the reducers still see months as continuous chunks. Now there are only six (or fewer) reducers that receive data — all of 2008 goes to one reducer, similarly 2009, 2010, and the rest of the years in the dataset. In our runs, we saw years X and Y (CODE adjust reducer count to let us prove the point, insert numbers) land on the same reducer. This uneven distribution of data across the reducers should cause the job to take slightly longer than the first run. To push that point even farther, running with --partition_keys=1 --sort_keys=3
now partitions on the century — which all the records share. You’ll now see 19 reducers finish promptly following the last mapper, and the job should take nearly twenty times as long as with --partition_keys=3
.
Finally, try running it with --partition_keys=4 --sort_keys=4
, causing records to be partitioned by century/year/month/day. Now the days in a month will be spread across all the reducers: for December 2010, we saw -00000
receive X, Y and -00001
receive X, Y, Z; out of 20 reducers, X of them received records from that month (CODE insert numbers). Since our reducer class is coded to aggregate by century/year/month, each of those reducers prepared its own meaningless total pageview count for December 2010, each of them a fraction of the true value. You must always ensure that all the data you’ll combine in an aggregate lands on the same reducer.