In this chapter we will cover ordering operations, or operations that sort data according to some criteria. Pig has two concepts of order - entire datasets can be sorted, as can the contents of a bag. We’ll learn how to sort relations and bags, and also how to calculate the top records of a relation by combining ORDER
with LIMIT
. With these skills in hand, we’ll be one step closer to being able to solve any arbitrary data processing task using the set of patterns we’ve learned.
Ordering operations are a fundamental part of storytelling. A big part of telling stories with data is coming up with examples that prove a point. This means diving into the data to produce the most exceptional records. When data is big, this invariably means you need to sort the data to pick up the highest or lowest value(s) of some metric.
So far we’ve mostly limited ourselves to the ordering inherently provided by the map/reduce shuffle/sort operation, which does provide a sorted list on the reduce key for each file. If we’re running a small job with a single reducer, that does provide a total sort. However, if we want an overall sort using multiple reducers (as we must, if we’re working with 'big data'), we must employ Pig’s ORDER
command. Lets begin!
In order to demonstrated ordering records, we’re going to prepare a dataset detailing the performance of players at three phases of their career: young, prime and older. To do so, we’ll be making use of familiar patterns. We use the map-only patterns of 'Selecting Records that Satisfy a Condition' and 'Selecting Records that Satisfy Multiple Conditions' with an initial FILTER
, to include only seasons in the National or American leagues in our analysis. The map-only patterns 'Transform Records Individually using FOREACH’ and 'A nested `FOREACH
Allows Intermediate Expressions' assist in calculating the properties of player seasons. Finally, we employ 'Summarizing Multiple Subsets of a Group Simultaneously' to compute career metrics across different age categories.
mod_seasons = FILTER bat_seasons BY ((year_id >= 1900) AND (lg_id == 'NL' OR lg_id == 'AL'));
-- Create a season marked with the period of career it was in: young/prime/older
age_seasons = FOREACH mod_seasons {
young = (age <= 21 ? true : false);
prime = (age >= 22 AND age <= 29 ? true : false);
older = (age >= 30 ? true : false);
OB = H + BB + HBP;
TB = h1B + 2*h2B + 3*h3B + 4*HR;
GENERATE
player_id,
year_id,
PA AS PA_all,
AB AS AB_all,
OB AS OB_all,
TB AS TB_all,
(young ? 1 : 0) AS is_young,
(young ? PA : 0) AS PA_young, (young ? AB : 0) AS AB_young,
(young ? OB : 0) AS OB_young, (young ? TB : 0) AS TB_young,
(prime ? 1 : 0) AS is_prime,
(prime ? PA : 0) AS PA_prime, (prime ? AB : 0) AS AB_prime,
(prime ? OB : 0) AS OB_prime, (prime ? TB : 0) AS TB_prime,
(older ? 1 : 0) AS is_older,
(older ? PA : 0) AS PA_older, (older ? AB : 0) AS AB_older,
(older ? OB : 0) AS OB_older, (older ? TB : 0) AS TB_older
;
};
-- Calculate metrics by career epoch: young/prime/older
career_epochs = FOREACH (GROUP age_seasons BY player_id) {
PA_all = SUM(age_seasons.PA_all );
PA_young = SUM(age_seasons.PA_young);
PA_prime = SUM(age_seasons.PA_prime);
PA_older = SUM(age_seasons.PA_older);
-- OBP = (H + BB + HBP) / PA
OBP_all = 1.0f*SUM(age_seasons.OB_all) / PA_all ;
OBP_young = 1.0f*SUM(age_seasons.OB_young) / PA_young;
OBP_prime = 1.0f*SUM(age_seasons.OB_prime) / PA_prime;
OBP_older = 1.0f*SUM(age_seasons.OB_older) / PA_older;
-- SLG = TB / AB
SLG_all = 1.0f*SUM(age_seasons.TB_all) / SUM(age_seasons.AB_all);
SLG_prime = 1.0f*SUM(age_seasons.TB_prime) / SUM(age_seasons.AB_prime);
SLG_older = 1.0f*SUM(age_seasons.TB_older) / SUM(age_seasons.AB_older);
SLG_young = 1.0f*SUM(age_seasons.TB_young) / SUM(age_seasons.AB_young);
--
GENERATE
group AS player_id,
A_all AS PA_all,
PA_young AS PA_young,
PA_prime AS PA_prime,
PA_older AS PA_older,
--
MIN(age_seasons.year_id) AS beg_year,
MAX(age_seasons.year_id) AS end_year,
--
OBP_all + SLG_all AS OPS_all:float,
(PA_young >= 700 ? OBP_young + SLG_young : Null) AS OPS_young:float,
(PA_prime >= 700 ? OBP_prime + SLG_prime : Null) AS OPS_prime:float,
(PA_older >= 700 ? OBP_older + SLG_older : Null) AS OPS_older:float,
--
COUNT_STAR(age_seasons) AS n_seasons,
SUM(age_seasons.is_young) AS n_young,
SUM(age_seasons.is_prime) AS n_prime,
SUM(age_seasons.is_older) AS n_older
;
};
STORE career_epochs INTO 'career_epochs';
We’ll be using this epoch data throughout the chapter to demonstrate different ordering techniques, so don’t delete the data in the career_epochs
directory!
Anyone who has performed a SQL ORDER BY
query has prepared a dataset and then sorted it for human consumption. Indeed, creating metrics and then sorting records based on them is at the heart of any data analysis. For this reason, ORDER
is a one-line command in Pig:
sorted_records = ORDER records BY field1;
For this analysis, we’re only going to look at players who were able to make solid contributions over several years. We’ll define this as playing for five or more seasons and 2000 or more plate appearances (enough to show statistical significance), and a OPS of 0.650 (an acceptable-but-not-allstar level) or better. This means we must FILTER
, and then ORDER
and finally to LIMIT
to a data size we, as humans (as opposed to Mentats), can read:
career_epochs = FILTER career_epochs BY
((PA_all >= 2000) AND (n_seasons >= 5) AND (OPS_all >= 0.650));
career_young = ORDER career_epochs BY OPS_young DESC;
top_10_young = LIMIT career_young 10;
career_prime = ORDER career_epochs BY OPS_prime DESC;
top_10_prime = LIMIT career_prime 10;
career_older = ORDER career_epochs BY OPS_older DESC;
top_10_older = LIMIT career_older 10;
You’ll spot Ted Williams (willite01) as one of the top three young players, top three prime players, and top three old players. Ted Williams was pretty awesome.
(willite01,9788,1336,3279,5173,1939,1960,1.115402,1.0398661,1.1660492,1.103679,19,2,5,12) (foxxji01,9676,1302,5306,3068,1925,1945,1.0341599,1.0045433,1.0723403,0.98065215,20,5,8,7) (troskha01,5749,732,4122,895,1933,1946,0.890712,0.9756794,0.919405,0.6866708,11,2,7,2)
To put all records in a table in order, it’s not sufficient to use the sorting that each reducer applies to its input. If you sorted names from a phonebook, file part-00000
will have names that start with A, then B, up to Z; part-00001
will also have names from A-Z; and so on. The collection has a partial order, but we want the 'total order' that Pig’s ORDER BY
operation provides. In a total sort, each record in part-00000
is in order and precedes every records in part-00001
; records in part-00001
are in order and precede every record in part-00002
; and so forth. For this reason, Pig’s ORDER
command is necessary whenever we have more than one reducer.
Sorting by one field is great, but sometimes our data is a little more complex than that. For instance, we might want to sort by one metric but use another as a tie-breaker. In Pig, sorting on multiple fields is as easy as adding them in order with commas. For instance, to sort by number of older seasons, breaking ties by number of prime seasons:
career_older = ORDER career_epochs BY n_older DESC, n_prime DESC;
Wherever reasonable, "stabilize" your sorts by adding enough columns to make the ordering unique. This ensures the output will remain the same from run to run, a best practice for testing and maintainability that we introduced in the 'Map-Only Operations' chapter.
career_older = ORDER career_epochs BY n_older DESC, n_prime DESC, player_id ASC; -- makes sure that ties are always broken the same way.
Which players have aged the best — made the biggest leap in performance from their prime years to their older years? You might think the following would work, but you cannot use an expression in an ORDER..BY
statement:
by_diff_older = ORDER career_epochs BY (OPS_older-OPS_prime) DESC; -- fails!
Instead, generate a new field, sort on it, then project it away. Though it’s cumbersome to type, there’s no significant performance impact.
by_diff_older = FOREACH career_epochs GENERATE OPS_older - OPS_prime AS diff, player_id..; by_diff_older = FOREACH (ORDER by_diff_older BY diff DESC, player_id) GENERATE player_id..;
If you browse through that table, you’ll get a sense that current-era players seem to be over-represented. This is just a simple whiff of a question, but more nuanced analyses do show an increase in longevity of peak performance. Part of that is due to better training, nutrition, and medical care — and part of that is likely due to systemic abuse of performance-enhancing drugs.
Pig’s ORDER
command will sort capitalized words and lowercase words independently. There’s no intrinsic way to sort case-insensitive; instead, just force a lower-case field to sort on. We don’t have an interesting table with mixed-case records in the baseball dataset, but most UNIX-based computers come with a dictionary in the /usr/share
directory tree. Here’s how to sort that ignoring case:
Note: you’ll want to use Pig 'local mode' to run this next command: pig -x local
dict = LOAD '/usr/share/dict/words' AS (word:chararray);
sortable = FOREACH dict GENERATE LOWER(word) AS l_word, *;
dict_nocase = FOREACH (ORDER sortable BY l_word, word) GENERATE word;
dict_case = ORDER dict BY word DESC;
Note that we sorted on l_word
and word
: this stabilizes the sort, ensuring that even though Polish
and polish
tie in case-insensitivity those ties will always be resolved the same way.
Real data has nulls (missing data), and to create sane, rational and consistent dataflows in Pig requires careful thought about how to handle them. The default behavior of Pig is thus: when the sort field has nulls, Pig sorts them as least-most by default. That is, they will appear as the first rows for DESC
order and as the last rows for ASC
order. If you want to alter that behavior, you can project a dummy field having the 'favoritism' or artificial sort order you want to impose. Name this column first in your ORDER..BY
clause, and you can achieve whatever 'null behavior' you desire. We call this the 'dummy field trick.'
For example, below we sort players' careers with nulls first, and then in another way with nulls last:
nulls_sort_demo = FOREACH career_epochs GENERATE (OPS_older IS NULL ? 0 : 1) AS has_older_epoch, player_id..; nulls_then_vals = FOREACH (ORDER nulls_sort_demo BY has_older_epoch ASC, OPS_all DESC, player_id) GENERATE player_id..; vals_then_nulls = FOREACH (ORDER nulls_sort_demo BY has_older_epoch DESC, OPS_all DESC, player_id) GENERATE player_id..;
Use the 'dummy field trick' any time you want to float records to the top or bottom of the sort order based on a criterion. The example below moves all players whose careers start in 1985 or later to the top, but otherwise sorts on number of older seasons:
post1985_vs_earlier = FOREACH career_epochs GENERATE (beg_year >= 1985 ? 1 : 0) AS is_1985, player_id..; post1985_vs_earlier = FOREACH (ORDER post1985_vs_earlier BY is_1985 DESC, n_older DESC, player_id) GENERATE player_id..;
Note that again we add a tie-breaker, player_id, to the sort.
-
Standard Snippet —
ORDER tbl BY mykey;
-
Hello, SQL Users
-
Usually this is part of a
SELECT
statement; in Pig it stands alone -
You can’t put an expression in the
BY
clause
-
-
Important to Know — Pound-for-pound, unless followed by a
LIMIT
statement this is one of the most expensive operations you can perform, requiring two to three jobs and a full reduce -
Output Count — Unchanged
-
Records — Unchanged
-
Data Flow — Map-only on a sample of the data; Map and Reduce to perform the sort. In some cases, if Pig isn’t confident that it will sample correctly, an extra Map-only to perform the map-only/pipelinable operations before the sample
Sorting an entire relation is powerful, but more often we want to sort data that has been partitioned by some key - as within a GROUP..BY
operation. This operation is straightforward enough and so useful we’ve been applying it all this chapter, but it’s time to be properly introduced and clarify a couple points.
We can sort records within a group using ORDER BY
within a nested FOREACH
(which we introduced in the Map-only Operations chapter). Rather than sorting all players, lets try sorting the players on each team in a given season. Here’s a snippet to list the top four players for each team-season, in decreasing order by plate appearances:
players_PA = FOREACH bat_seasons GENERATE team_id, year_id, player_id, name_first, name_last, PA; team_playerslist_by_PA = FOREACH (GROUP players_PA BY (team_id, year_id)) { players_o_1 = ORDER players_PA BY PA DESC, player_id; players_o = LIMIT players_o_1 4; GENERATE group.team_id, group.year_id, players_o.(player_id, name_first, name_last, PA) AS players_o; };
Ordering a group in the nested block immediately following a structural operation does not require extra operations, since Pig is able to simply specify those fields as secondary sort keys. Basically, as long as it happens first in the reduce operation it’s free (though if you’re nervous, look for the line "Secondary sort: true" in the output of EXPLAIN). Messing with a bag before the ORDER..BY
causes Pig to instead sort it in-memory using quicksort, but will not cause another map-reduce job. That’s good news unless some bags are so huge they challenge available RAM or CPU, which won’t be subtle.
If you depend on having a certain sorting, specify it explicitly, even when you notice that a GROUP..BY
or some other operation seems to leave it in that desired order. It gives a valuable signal to anyone reading your code, and a necessary defense against some future optimization deranging that order [1]
Once sorted, the bag’s order is preserved by projections, by most functions that iterate over a bag, and by the nested pipeline operations FILTER
,
FOREACH
, and LIMIT
. The return values of nested structural operations CROSS
, ORDER BY
and DISTINCT
do not follow the same order as their input; neither do structural functions such as CountEach (in-bag histogram) or the set operations (REF) described at the end of the chapter. (Note that though their outputs are dis-arranged these of course don’t mess with the order of their inputs: everything in Pig is immutable once created.)
team_playerslist_by_PA_2 = FOREACH team_playerslist_by_PA { -- will not have same order, even though contents will be identical disordered = DISTINCT players_o; -- this ORDER BY does _not_ come for free, though it's not terribly costly alt_order = ORDER players_o BY player_id; -- these are all iterative and so will share the same order of descending PA still_ordered = FILTER players_o BY PA > 10; pa_only = players_o.PA; pretty = FOREACH players_o GENERATE StringConcat((chararray)PA, ':', name_first, ' ', name_last); GENERATE team_id, year_id, disordered, alt_order, still_ordered, pa_only, BagToString(pretty, '|'); };
-
Where You’ll Use It — Extracting top records from a group (see next). Preceding many UDFs that depend on ordering. To make your output readable. To stabilize results.
-
Hello, SQL Users — This is not directly analogous to the
ORDER BY
part of aSELECT
statement, as it is done to the inner bag. For users of Oracle and other databases, this is similar to a sort within a windowed query. -
Important to Know — If it can be applied to the records coming from the mapper, it’s free. Verify by looking for
Secondary sort: true
in the output ofEXPLAIN
-
Output Count — Unchanged
-
Records — Unchanged
On its own, LIMIT
will return the first records it finds. What if you want to rank the records — sort by some criteria — so you don’t just return the first ones, but the top ones?
Use the ORDER
operator before a LIMIT
to guarantee this "top K" ordering. This technique also applies a clever optimization (reservoir sampling, see TODO ref) that sharply limits the amount of data sent to the reducers.
Let’s say you wanted to select the top 20 seasons by number of hits:
sorted_seasons = ORDER (FILTER bat_seasons BY PA > 60 AND year_id > 1900) BY H DESC;
top_20_seasons = LIMIT sorted_seasons 20;
In SQL, this would be:
SELECT * FROM bat_season WHERE PA > 60 AND year_id > 1900 ORDER BY H DESC LIMIT 20;
There are two useful optimizations to make when the number of records you will keep (K) is much smaller than the number of records in the table (N). The first one, which Pig does for you, is to only retain the top K records at each Mapper; this is a great demonstration of where a Combiner is useful: After each intermediate merge/sort on the Map side and the Reduce side, the Combiner discards all but the top K records.
Pig’s 'top' function accepts a bag and returns a bag with its top K elements.
top_5_per_season = FOREACH (GROUP bat_seasons BY year_id) GENERATE
group AS year_id,
TOP(5,19,bat_seasons); -- 19th column is RBIs (start at 0)
You could achieve the same output with the more cumbersome:
top_5_per_season = FOREACH (GROUP bat_seasons BY year_id) {
sorted = ORDER bat_seasons BY RBI DESC;
top_5 = LIMIT sorted 5;
ascending = ORDER top_5 BY RBI;
GENERATE
group AS year_id,
ascending AS top_5;
};
The RANK
command prepends a ranked label for each record in a relation. You can RANK
an entire record, or one of more fields in a record.
ranked_seasons = RANK bat_seasons; ranked_rbi_seasons = RANK bat_seasons BY RBI DESC, H DESC, player_id; ranked_hit_dense = RANK bat_seasons BY H DESC DENSE;
If you supply only the name of the table, RANK acts as a pipeline operation, introducing no extra map/reduce stage. Each split is numbered as a unit: the third line of chunk part-00000
gets rank 2, the third line of chunk part-00001
gets rank 2, and so on.
It’s important to know that in current versions of Pig, the RANK operator sets parallelism one, forcing all data to a single reducer.
gift_id gift RANK RANK gift_id RANK gift DENSE 1 partridge 1 1 1 4a calling birds 2 4 7 4b calling birds 3 4 7 2a turtle dove 4 2 2 4d calling birds 5 4 7 5 golden rings 6 5 11 2b turtle dove 7 2 2 3a french hen 8 3 4 3b french hen 9 3 4 3c french hen 10 3 4 4c calling birds 11 4 7
Sometimes we want to find the record with the maximum value and preserve it. In Pig, we can do this with a nested ORDER BY
/LIMIT
inside a FOREACH
. For example, for each player, find their best season by RBI:
-- For each season by a player, select the team they played the most games for. -- In SQL, this is fairly clumsy (involving a self-join and then elimination of -- ties) In Pig, we can ORDER BY within a foreach and then pluck the first -- element of the bag. top_stint_per_player_year = FOREACH (GROUP bat_seasons BY (player_id, year_id)) { sorted = ORDER bat_seasons BY RBI DESC; top_stint = LIMIT sorted 1; stints = COUNT_STAR(bat_seasons); GENERATE group.player_id, group.year_id, stints, FLATTEN(top_stint.(team_id, RBI)) AS (team_id, RBI); }; DUMP @;
It turns out this dataset has no stints, only the most significant stint is listed in the bat_seasons
data.
One common use of Hadoop is to run simulations at scale. When doing this, it is often handy to prepare multiple unique sorts of a single dataset. In other words, multiple 'shuffles' of the same data. To shuffle a set of records, we’re going to apply the Assign a unique ID pattern to generate an arbitrary key (one that is decoupled from the records' content), and then use that to order the records.
DEFINE Hasher datafu.pig.hash.Hasher('sip24', 'rand');
people_hashed = FOREACH people GENERATE Hasher(player_id) AS hash, *;
people_ranked = RANK people_hashed;
-- Back to the original records by skipping the first, hash field
people_shuffled = FOREACH people_ranked GENERATE $2..;
STORE people_shuffled INTO 'people_shuffled/1/';
You can run this script multiple times with different output paths to get different shuffles of the same data.
We use a randomized hash function UDF for each record; then number each line within the split. The important difference here is that the hash function we generated accepts a seed that we can mix in to each record. If you supply a constant to the constructor (see the documentation) then the records will be put into an effectively random order, but the same random order each time. By supplying the string 'rand'
as the argument, the UDF will use a different seed on each run. What’s nice about this approach is that although the ordering is different from run to run, it does not exhibit the anti-pattern of changing from task attempt to task attempt. The seed is generated once and then used everywhere. Rather than creating a new random number for each row, you use the hash to define an effectively random ordering, and the seed to choose which random ordering to apply.
In this chapter, we’ve learned to sort, rank and order data. We learned how to sort entire relations by one or more fields. We learned how to prioritize certain records when sorting. We learned how to deal with sorting nulls and mixed-case strings. We showed how to sort within a group with TOP
and a nested ORDER BY
. Finally, we learned how to shuffle, or sort randomly using a Hash.
Our bag-of-tricks is getting larger and larger. Soon there will be no data-processing problem you face that you can’t come up with a solution for using the patterns in this book. Your applied knowledge of Pig and Hadoop will constitute a working knowledge of analytics in general, and you’ll be able to arbitrarily process data at scale and implement algorithms on big data. When you become as comfortable processing big data as you are small, there are boundless opportunities to work with the ever increasing onslaught of new, big data to create new insights, build new products and make better decisions.
In the next chapter, we’ll learn about creating unique values and relations, and working with sets. This will complete our analytic toolkit.