Some content contributed by Q. Ethan McCallum (@qethanm)
In this chapter we will introduce grouping operations in Pig and map/reduce. We’ll teach you the schemas behind grouped data, how to inspect and sample grouped data relations, how to count records in groups and how to use aggregate functions to calculate arbitrary statistics about groups. We’ll teach you to describe and summarize individual records, fields or entire data tables. In so doing, we’ll explore questions such as, "Does God hate Cleveland?" and "Who are the best players for each phase of their career?"
The GROUP BY
operation is fundamental to data-processing - both in map/reduce and in the world of SQL. In this chapter, we will cover grouping operations in Pig, which are 'one-liners' or one line of Pig code to perform. This is part of Pig’s power. We’ll learn how grouping operations relate to the reduce phase of map/reduce and how to combine map-only operations with GROUP BY
operations to perform arbitrary operations on data relations.
Grouping operations are at the heart of map/reduce - they make use of and define the 'reduce' operation of map/reduce, in which records with the same reduce key are grouped on a single reducer in sorted order. Thus is is possible to define a single map/reduce job that performs any number of map-only operations, followed by a grouping operation, followed by more map-only operations after the reduce. This simple pattern enables map/reduce to perform a wide array of operations, implementing a wide array of algorithms.
The GROUP BY
operation is at the heart of every structural operation. GROUP BY
is responsible for collecting records for other operations to occur on them as they are grouped together.
Lets dive right in. Grouping data in Pig is easy: it’s a one-liner in Pig to collect all the stadiums a team has played for in its history:
park_teams_g = GROUP park_team_years BY team_id;
The result of a GROUP BY
operation is always a field called 'group', followed by one field per grouped table, each named for the table it came from. The shape of the group
field depends on whether you specified one or many group keys. If you specified a single key, the new group
field is a scalar with the same schema. If you specified multiple keys, the new group
field is a tuple whose schema corresponds to those keys. In this example, we grouped on the team_id
chararray, and so the group
field is a scalar chararray as well. In a moment we’ll group on year_id
and team_id
, and so the group
field would have schema group:tuple(year_id:int, team_id:chararray)
.
Each of the following fields is a bag, holding tuples whose schema matches the corresponding table. Notice that the name we used to refer to the table is now also the name for a field. This will confuse you at first, but soon become natural, especially if you use DESCRIBE
liberally:
DESCRIBE park_teams_g;
/*
park_teams_g: {
group: chararray,
park_team_years: {
(
park_id: chararray,
team_id: chararray,
year_id: long,
beg_date: chararray,
end_date: chararray,
n_games: long
)
}
}
*/
A = LIMIT park_teams_g 2;
dump A
Notice that the full record is kept, even including the keys:
(ALT,{(ALT01,ALT,1884,1884-04-30,1884-05-31,18)})
(ANA,{(ANA01,ANA,2001,2001-04-10,2001-10-07,81),
(ANA01,ANA,2010,2010-04-05,2010-09-29,81),...})
To eliminate the redundant data, you’ll almost always immediately project using a FOREACH
. This lets you trim the fields in the group to those you are interested in. Pig allows you to put the GROUP
statement inline within a`FOREACH`:
team_pkyr_pairs = FOREACH (GROUP park_team_years BY team_id) GENERATE
group AS team_id,
park_team_years.(park_id, year_id) AS park_team_years;
-- (ALT,{(ALT01,1884)})
-- (ANA,{(ANA01,2001),(ANA01,2010),(ANA01,2002),...})
Note that we used an AS clause within our FOREACH
/GENERATE
call, to give an explicit name, park_team_years
, to park_team_years.(park_id, year_id)
. It is easier to give explicit names as a part of managing our schemas, so that a field can be explicitly referred to later on.
Also notice the park_team_years.(park_id, year_id)
form, which gives us a bag of (park_id, year_id)
pairs. Using park_teams.park_id, park_teams.year_id
instead gives two bags, one with park_id tuples and one with year_id tuples:
team_pkyr_bags = FOREACH (GROUP park_team_years BY team_id) GENERATE
group AS team_id,
park_team_years.park_id AS park_ids,
park_team_years.year_id AS park_years;
-- (ALT, {(ALT01)}, {(1884)})
-- (ANA, {(ANA01),(ANA01),(ANA01),...}, {(2001),(2010),(2002),...})
DESCRIBE team_pkyr_pairs;
/*
team_pkyr_pairs: {
team_id: chararray,
park_team_years: {
(park_id: chararray,year_id: long)
}
}
*/
DESCRIBE team_pkyr_bags;
/*
team_pkyr_bags: {
team_id: chararray,
park_ids: {
(park_id: chararray)
},
park_years: {
(year_id: long)
}
}
*/
You can group on multiple fields. For each team and year, we can find the park(s) that team called home:
team_yr_parks_g = GROUP park_team_years BY (year_id, team_id);
The first field is still called 'group', but it’s now a tuple.
DESCRIBE team_yr_parks_g;
/*
team_yr_parks_g: {
group:
(
year_id: long,
team_id: chararray
),
park_team_years: {
(
park_id: chararray,
team_id: chararray,
year_id: long,
beg_date: chararray,
end_date: chararray,
n_games: long
)
}
}
*/
Our FOREACH
statement now looks a bit different:
team_yr_parks = FOREACH(GROUP park_team_years BY (year_id, team_id)) GENERATE
group.team_id, park_team_years.park_id AS park_ids;
just_4 = LIMIT team_yr_parks 4; DUMP @;
-- (BS1,{(BOS01),(NYC01)})
-- (CH1,{(NYC01),(CHI01)})
-- (CL1,{(CIN01),(CLE01)})
-- (FW1,{(FOR01)})
If you have multiple group keys, and want all of the group keys back, you can flatten the group tuple and cast it, as in:
year_team = FOREACH (GROUP park_team_years BY (year_id, team_id)) GENERATE
FLATTEN(group) AS (year_id, team_id);
DESCRIBE year_team;
year_team: {
year_id: long,
team_id: chararray
}
-
Where You’ll Use It — Rarely on its own, but otherwise everywhere
-
Standard Snippet —
FOREACH (GROUP recs BY (key1, key2)) GENERATE group.key1, group.key2, recs AS bag_of_recs_records;
-
Hello, SQL Users — Similar to the windowed functionality supplied by high-end SQL databases. MySQL, PostgreSQL, etc don’t have similar functionality.
-
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values
-
Records — Output is
group, bag of records
, with record contents within the bag unchanged. -
Data Flow — Map & Reduce
The typical reason to group records is to operate on the bag of values it forms, and that’s how we’ll spend much of this chapter — the data bag is a very powerful concept. Let’s take a quickie tour of what we can do to a group; afterwards we’ll see the internals of how a group works before moving on to its broader applications.
You’ll notice from the result of the last query that sometimes a team has more than one "home" stadium in a season. That’s a bit unexpected, but on consideration teams occasionally face stadium repairs or late-season makeups for cancelled games. But cases where there were even three home parks should be quite rare. Let’s confirm our feel for the data using COUNT_STAR
, which counts all elements of a bag:
team_n_parks = FOREACH (GROUP park_team_years BY (team_id,year_id)) GENERATE
group.team_id,
COUNT_STAR(park_team_years) AS n_parks;
DESCRIBE team_n_parks;
/*
team_n_parks: {
team_id: chararray,
n_parks: long
}
*/
vagabonds = FILTER team_n_parks BY n_parks >= 3;
DUMP vagabonds;
(CL4,7)
(CLE,5)
(WS3,4)
(CLE,3)
(DET,3)
...
Always, always look through the data and seek 'second stories'.
Our script is reporting that CL4 (the Cleveland Spiders) called seven (!) different stadiums home during a season. Is this some weirdness in the data? Could we possibly have messed up this three-line script? Or is it really the case that some teams have had four, five, even seven home stadiums? This demands a closer look.
-
Where You’ll Use It — Anywhere you’re summarizing counts
-
Standard Snippet —
FOREACH (GROUP recs BY mykey) GENERATE group AS mykey, COUNT_STAR(recs) AS ct;
-
Hello, SQL Users —
SELECT key, COUNT() as CT from recs GROUP BY key;
. Remember:COUNT_STAR(recs)
, notCOUNT(
)
. -
Important to Know — See "Pattern in Use" for Aggregate Functions, below (REF)
-
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values
-
Records — Output is
mykey, ct:long
-
Data Flow — Map, Combiner & Reduce; combiners very effective unless cardinality extremely high
Let’s keep the count of parks, but also list the parks themselves for inspection. We could keep dumping the values in Pig’s oddball output format, but this is a good opportunity to introduce a very useful pattern: de-normalizing a collection of values into a single delimited field.
The format Pig uses to dump bags and tuples to disk wastes characters and is not safe to use in general: any string containing a comma or bracket will cause its record to be mis-interpreted. For simple data structures such as a list, we are better off concatenating the values together using a delimiter: a character with no other meaning that does not appear in any of the values. This preserves the rows-and-columns representation of the table that Pig handles best. It also lets us keep using the oh-so-simple TSV format for interchange with Excel, cut
and other commandline tools, and later runs of Pig itself. Storing data this way means we do have to pack and unpack the value ourselves, which is an added burden when we need access to the array members. But if accessing the list contents is less frequent this can act as a positive feature: we can move the field around as a simple string and only pay the cost of constructing the full data structure when necessary.
The BagToString function will serialize a bag of values into a single delimited field as follows:
team_year_w_parks = FOREACH (GROUP park_teams BY (team_id, year_id)) GENERATE
group.team_id,
COUNT_STAR(park_teams) AS n_parks,
BagToString(park_teams.park_id, '^') AS park_ids;
DESCRIBE team_year_w_parks;
/*
team_year_w_parks: {
team_id: chararray,
n_parks: long,
park_ids: chararray
}
/*
top_team_year_w_parks = ORDER team_year_w_parks BY n_parks DESC;
top_20 = LIMIT top_team_year_w_parks 20; DUMP @;
/*
(CL4,7,CHI08^CLL01^CLE05^PHI09^ROC03^STL05^ROC02)
(CLE,5,CLE05^DAY01^FOR03^CAN01^COL03)
(WS3,4,BAL01^WAS01^NYC01^CIN01)
(CL3,3,GEA01^NEW03^CLE03)
(CL4,3,IND06^CLE03^DET01)
(BFN,3,ELM01^MIL02^BUF02)
(WS6,3,BAL02^WAS01^RIC01)
*/
This script ouputs four fields — park_id, year, count of stadiums, and the names of the stadiums used separated by a ^
caret delimiter. Like colon ':', comma ,
, and slash '/', it doesn’t need to be escaped at the commandline; like those and semicolon ;
, pipe |
, and bang !
, it is visually lightweight and can be avoided within a value. Don’t use the wrong delimiter for addresses ("Fargo, ND"), dates ("2014-08-08T12:34:56+00:00"), paths (/tmp/foo
) or unsanitized free text (It’s a girl! _ \m/ |:-)
). If you are considering the use of quoting or escaping to make your strings delimiter safe, you’re getting carried away. Stop, step away from the delimiter, and see "Representing a Complex Data Structure as a JSON-encoded String" (REF) below.
Since the park ids are formed from the first characters of the city name, we can recognize that the Spiders' home fields include two stadiums in Cleveland plus "home" stadiums in Philadelphia, Rochester, St. Louis, and Chicago. These aren’t close enough to be alternatives in case of repairs, and 1898 baseball did not call for publicity tours. Were they rotating among these fields, or just spending a day or so at each? Let’s see how many were played at each stadium.
-
Where You’ll Use It — Creating a URL for a batch request. Hiding a list you don’t always want to deserialize. Writing a table in a format that will work everywhere.
-
Standard Snippet —
FOREACH (GROUP recs BY key) GENERATE group AS mykey, BagToString(recs, '|') AS recs_list;
-
Hello, SQL Users — Similar to
GROUP_CONCAT
, but you prepare the input bag first; no fiddly in-lineDISTINCT
calls. -
Important to Know — Be careful with your choice of delimiter. Keep it simple. Don’t stringify huge groups.
-
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values
-
Records — Output is
mykey, recs_list:chararray
-
Data Flow — Map & Reduce; no real data reduction or explosion as assumedly you’re turning all the data into strings.
Instead of serializing the simple list of park ids we had before, we’d now like to prepare and serialize the collection of (park id, number of games) pairs. We can handle this by using two delimiters: one for separating list elements and one for delimiting its contents. (This is also how you would handle an object with simple attribute-value pairs such as a hash map.)
team_year_w_pkgms = FOREACH (GROUP park_team_years BY (team_id, year_id)) {
/* Create 'park ID'/'game count' field */
pty_ordered = ORDER park_team_years BY n_games DESC;
pk_ng_pairs = FOREACH pty_ordered GENERATE
CONCAT(park_id, ':', (chararray)n_games) AS pk_ng_pair;
/* Generate team/year, number of parks and list of parks/games played */
GENERATE group.team_id, group.year_id,
COUNT_STAR(park_team_years) AS n_parks,
BagToString(pk_ng_pairs,'|') AS pk_ngs;
};
top_team_parks = ORDER team_year_w_pkgms BY n_parks DESC;
top_20 = LIMIT top_team_parks 20;
-- DUMP @;
STORE top_20 INTO 'park_teams_report';
Which results in:
(CL4,1898,7,CLE05:40|PHI09:9|STL05:2|CLL01:2|ROC02:2|CHI08:1|ROC03:1) (CLE,1902,5,CLE05:60|FOR03:2|COL03:1|CAN01:1|DAY01:1) (WS3,1871,4,WAS01:11|BAL01:1|NYC01:1|CIN01:1) (CL3,1888,3,CLE03:56|GEA01:3|NEW03:1) (CL4,1890,3,CLE03:63|IND06:6|DET01:1) (BFN,1885,3,BUF02:50|ELM01:2|MIL02:1) (WS6,1875,3,WAS01:8|RIC01:2|BAL02:2) (BS1,1875,3,BOS01:35|SPR01:1|PRO01:1) (MID,1872,3,MID01:7|HRT02:3|SPR01:1) (CHU,1884,3,CHI05:35|PIT03:5|BAL04:1) ...
There are a few new things going on here. We’ve snuck the ORDER BY
statement into a few previous examples even though it won’t be covered until later in the chapter (REF), but always as a full-table operator. Here we’re using it within the body of a FOREACH
to sort each bag locally, rather than as a total sort of the whole table. One nice thing about this ORDER BY
: it’s essentially free, as Pig just instructs Hadoop to do a secondary-sort on the data as it lands on the reducer. So there’s no reason not to make the data easier to read.
After the ORDER BY
statement, we use a nested FOREACH
to staple each park onto the number of games at that park, delimited with a colon. (Along the way you’ll see we also typecast the n_games
value, since the CONCAT
method expects a chararray
.) The final GENERATE
line creates records naming the team, the count of parks, and the list of park-usages pairs:
hadoop fs -cat park_teams_report/*
CL4 1898 7 CLE05:40|PHI09:9|STL05:2|CLL01:2|ROC02:2|CHI08:1|ROC03:1
CLE 1902 5 CLE05:60|FOR03:2|COL03:1|CAN01:1|DAY01:1
WS3 1871 4 WAS01:11|BAL01:1|NYC01:1|CIN01:1
CL3 1888 3 CLE03:56|GEA01:3|NEW03:1
CL4 1890 3 CLE03:63|IND06:6|DET01:1
BFN 1885 3 BUF02:50|ELM01:2|MIL02:1
WS6 1875 3 WAS01:8|RIC01:2|BAL02:2
BS1 1875 3 BOS01:35|SPR01:1|PRO01:1
MID 1872 3 MID01:7|HRT02:3|SPR01:1
CHU 1884 3 CHI05:35|PIT03:5|BAL04:1
...
Out of 156 games that season, the Spiders played only 42 in Cleveland. Between the 15 "home" games in other cities, and their ninety-nine away games, they spent nearly three-quarters of their season on the road.
The Baseball Library Chronology sheds some light. It turns out that labor problems prevented play at their home or any other stadium in Cleveland for a stretch of time, and so they relocated to Philadelphia while that went on. What’s more, on June 19th police arrested the entire team during a home game [1] for violating the Sunday "blue laws" [2]. Little wonder the Spiders decided to take their talents away from Cleveland! The following year they played 50 straight on the road, won fewer than 13% of their games overall (20-134, the worst single-season record ever) and immediately disbanded at season’s end.
So the result for the Spiders isn’t a mistake. Is the team a sole anomalous outlier, or are there other cases, less extreme but similar? The Spiders' season stands out for at least these three reasons: an unusual number of alternate parks; "home" games played in other cities; and a pre-modern (1898) setting. So let’s include a field for the city (we’ll take the first three characters of the park id to represent the city name) and not throw away the field for year.
-- Prepare the city field
pktm_city = FOREACH park_team_years GENERATE
team_id,
year_id,
park_id,
n_games,
SUBSTRING(park_id, 0,3) AS city;
-- First grouping: stats about each city of residence
pktm_stats = FOREACH (GROUP pktm_city BY (team_id, year_id, city)) {
pty_ordered = ORDER pktm_city BY n_games DESC;
pk_ct_pairs = FOREACH pty_ordered GENERATE
StringConcat(park_id, ':', (chararray)n_games);
GENERATE
group.team_id,
group.year_id,
group.city AS city,
COUNT_STAR(pktm_city) AS n_parks,
SUM(pktm_city.n_games) AS n_city_games,
MAX(pktm_city.n_games) AS max_in_city,
BagToString(pk_ct_pairs,'|') AS parks
;
};
top_parks = ORDER pktm_stats BY n_parks DESC; DUMP @;
DUMP
shows us:
(BR3,1889,NYC,3,71,49,NYC08:49|NYC18:14|NYC05:8) (BSN,1894,BOS,3,63,32,BOS05:32|BOS04:27|BOS03:4) (PHI,1894,PHI,3,71,43,PHI06:43|PHI14:22|PHI08:6) (NY1,1911,NYC,3,75,45,NYC14:45|NYC13:28|NYC10:2) (PHI,1927,PHI,2,78,66,PHI09:66|PHI11:12) (LS3,1893,LOU,2,53,52,LOU03:52|LOU02:1) (NY4,1884,NYC,2,55,33,NYC06:33|NYC03:22) (CLE,1946,CLE,2,77,41,CLE07:41|CLE06:36) (CLE,1945,CLE,2,77,46,CLE07:46|CLE06:31) ...
The records we’re forming are significantly more complex this time. With fields of numbers or constrained categorical values, stapling together delimited values is a fine approach. But when fields become this complex, or when there’s any danger of stray delimiters sneaking into the record, if you’re going to stick with TSV you are better off using JSON encoding to serialize the field. It’s a bit more heavyweight but nearly as portable, and it happily bundles complex structures and special characters to hide within TSV files. [3]
-- Next, assemble full picture:
farhome_gms = FOREACH (GROUP pktm_stats BY (team_id, year_id)) {
pty_ordered = ORDER pktm_stats BY n_city_games DESC;
city_pairs = FOREACH pty_ordered GENERATE CONCAT(city, ':', (chararray)n_city_games);
n_home_gms = SUM(pktm_stats.n_city_games);
n_main_city = MAX(pktm_stats.n_city_games);
n_main_park = MAX(pktm_stats.max_in_city);
-- a nice trick: a string vs a blank makes it easy to scan the data for patterns:
is_modern = (group.year_id >= 1905 ? 'mod' : NULL);
--
GENERATE group.team_id, group.year_id,
is_modern AS is_modern,
n_home_gms AS n_home_gms,
n_home_gms - n_main_city AS n_farhome_gms,
n_home_gms - n_main_park AS n_althome_games,
COUNT_STAR(pktm_stats) AS n_cities,
BagToString(city_pairs,'|') AS cities,
BagToString(pktm_stats.parks,'|') AS parks
;
};
farhome_gms = ORDER farhome_gms BY n_cities DESC, n_farhome_gms DESC;
STORE farhome_gms INTO 'json_test' USING JsonStorage();
Here’s a sample of the output:
{
"team_id":"BSN",
"year_id":1894,
"city":"BOS",
"n_parks":3,
"n_city_games":63,
"max_in_city":32,
"parks":"BOS05:32|BOS04:27|BOS03:4"
}
{"team_id":"PHI","year_id":1894,"city":"PHI","n_parks":3,"n_city_games":71,"max_in_city...
{"team_id":"NY1","year_id":1911,"city":"NYC","n_parks":3,"n_city_games":75,"max_in_city...
{"team_id":"PHI","year_id":1927,"city":"PHI","n_parks":2,"n_city_games":78,"max_in_city...
{"team_id":"LS3","year_id":1893,"city":"LOU","n_parks":2,"n_city_games":53,"max_in_city...
{"team_id":"NY4","year_id":1884,"city":"NYC","n_parks":2,"n_city_games":55,"max_in_city...
{"team_id":"CLE","year_id":1946,"city":"CLE","n_parks":2,"n_city_games":77,"max_in_city...
{"team_id":"CLE","year_id":1945,"city":"CLE","n_parks":2,"n_city_games":77,"max_in_city...
...
-
Where You’ll Use It — Creating the POST body for a json batch request. Hiding a complex value you don’t always want to deserialize. Writing a table in a format that will work everywhere. Creating a string free of non-keyboard characters.
-
Standard Snippet —
STORE my_relation INTO 'my_relation' USING JsonStorage();
-
Output Count — As many records as the relation contains
-
Records — Output is one json object per line
-
Data Flow — Map & Reduce; mild data expansion as JSON repeats the sub-field names on each row.
Probably. But are the Spiders a particularly anomalous exhibition? No. Considered against the teams of their era, they look much more normal. In the early days baseball was still literally getting its act together and teams hopped around frequently. Since 1905, no team has seen home bases in three cities, and the three cases where a team spent any significant time in an alternate city each tell a notable story.
In 2003 and 2004, les pauvres Montreal Expos were sentenced to play 22 "home" games in San Juan (Puerto Rico) and only 59 back in Montreal. The rudderless franchise had been sold back to the league itself and was being shopped around in preparation for a move to Washington, DC. With no real stars, no home-town enthusiasm, and no future in Montreal, MLB took the opportunity to build its burgeoning fanbase in Latin America and so deployed the team to Puerto Rico part-time. The 1968-1969 Chicago White Sox (CHA) were similarly team-building in Milwaukee; the owner of the 1956-1957 Brooklyn Dodgers slipped them away for a stint in New Jersey in order to pressure Brooklyn for a new stadium.
You won’t always want to read a second story to the end as we have here, but it’s important to at least identify unusual features of your data set — they may turn out to explain more than you’d think.
Note
|
In traditional analysis with sampled data, edge cases undermine the data, presenting the spectre of a non-representative sample or biased result. In big data analysis on comprehensive data, the edge cases prove the data. Here’s what we mean. Since 1904, only a very few teams have multiple home stadiums, and no team has had more than two home stadiums in a season. Home-field advantage gives a significant edge: the home team plays the deciding half of the final inning, their roster is constructed to take advantage of the ballpark’s layout, and players get to eat home-cooked meals, enjoy the cheers of encouraging fans, and spend a stretch of time in one location. The Spiders and Les Expos and a few others enjoyed only part of those advantages. XX % of our dataset is pre-modern and Y% had six or more home games in multiple cities. |
With a data set this small there’s no good way to control for these unusual circumstances, and so they represent outliers that taint our results. With a large and comprehensive data set those small fractions would represent analyzable populations of their own. With millions of seasons, we could conceivably baseline the jet-powered computer-optimized schedules of the present against the night-train wanderjahr of Cleveland Spiders and other early teams.
Some of the happiest moments you can have analyzing a massive data set come when you are able to make it a slightly less-massive data set. Aggregate functions — ones that turn the whole of a group into a scalar value — are the best path to this joy.
In the previous chapter, we used each player’s seasonal counting stats — hits, home runs, and so forth — to estimate seasonal rate stats — how well they get on base (OPS), how well they clear the bases (SLG) and an overall estimate of offensive performance (OBP). But since we were focused on pipeline operations, we only did so on a season-by-season basis. The group-and-aggregate pattern lets us combine those seasonal stats in order to characterize each player’s career.
bat_careers = FOREACH (GROUP bat_seasons BY player_id) {
totG = SUM(bat_seasons.G);
totPA = SUM(bat_seasons.PA); totAB = SUM(bat_seasons.AB);
totHBP = SUM(bat_seasons.HBP); totSH = SUM(bat_seasons.SH);
totBB = SUM(bat_seasons.BB); totH = SUM(bat_seasons.H);
toth1B = SUM(bat_seasons.h1B); toth2B = SUM(bat_seasons.h2B);
toth3B = SUM(bat_seasons.h3B); totHR = SUM(bat_seasons.HR);
totR = SUM(bat_seasons.R); totRBI = SUM(bat_seasons.RBI);
OBP = 1.0*(totH + totBB + totHBP) / totPA;
SLG = 1.0*(toth1B + 2*toth2B + 3*toth3B + 4*totHR) / totAB;
team_ids = DISTINCT bat_seasons.team_id;
GENERATE
group AS player_id,
COUNT_STAR(bat_seasons) AS n_seasons,
COUNT_STAR(team_ids) AS card_teams,
MIN(bat_seasons.year_id) AS beg_year,
MAX(bat_seasons.year_id) AS end_year,
totG AS G,
totPA AS PA, totAB AS AB, totHBP AS HBP, -- $6 - $8
totSH AS SH, totBB AS BB, totH AS H, -- $9 - $11
toth1B AS h1B, toth2B AS h2B, toth3B AS h3B, -- $12 - $14
totHR AS HR, totR AS R, totRBI AS RBI, -- $15 - $17
OBP AS OBP, SLG AS SLG, (OBP + SLG) AS OPS -- $18 - $20
;
};
STORE bat_careers INTO 'bat_careers';
We first gather together all seasons by a player by grouping on player_id
, then throw a barrage of SUM
, COUNT_STAR
, MIN
and MAX
functions at the accumulated fields to find the career totals. Using the nested FOREACH
form means we can use intermediate values such as totPA
in both the calculation of OBP
and as a field in the new table directly.
The nested FOREACH
also lets us apply the DISTINCT
bag operation, creating a new bag holding only the distinct team_id
values across all seasons. That statement has, in principle, two steps: projection of a bag-with-just-team_id followed by DISTINCT
to eliminate duplicates. But behind the scenes, Pig uses a special kind of bag (DistinctDataBag
) that in all respects meets the data bag interface, but which uses an efficient internal data structure to eliminate duplicates as they’re added. So rather than (list of seasons) → (list of team_ids) → (list of distinct team_ids) you only have to pay for (list of seasons) → (list of distinct team_ids)
We will use the bat_careers
table in several later demonstrations, so keep its output file around.
See the Pattern in Use for the next section too (REF).
-
Where You’ll Use It — Everywhere. Turning manufactured items into statistics about batches. Summarizing a cohort. Rolling up census block statistics to state-level statistics.
-
Standard Snippet —
FOREACH (GROUP recs BY key) GENERATE group AS mykey, AggregateFunction(recs), AggregateFunction(recs), …;
-
Hello, SQL Users — Directly comparable for the most part.
-
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values. Big decrease in output size from turning bags into scalars
-
Records — Something like
mykey, aggregated_value, aggregated_value, …
-
Data Flow — Map, Combiner & Reduce; combiners quite effective unless cardinality is very high.
In the preceding case, the aggregate functions were used to create an output table with similar structure to the input table, but at a coarser-grained relational level: career rather than season. The result was a new table to analyze, not a conceptual report. Statistical aggregations also let you summarize groups and tables with well-understood descriptive statistics. By sketching their essential characteristics at dramatically smaller size, we make the data easier to work with but more importantly we make it possible to comprehend.
The following functions are built in to Pig:
-
Count of all values:
COUNT_STAR(bag)
-
Count of non-
null
values:COUNT(bag)
-
Minimum / Maximum non-
null
value:MIN(bag)
/MAX(bag)
-
Sum of non-
null
values:SUM(bag)
-
Average of non-
null
values:AVG(bag)
There are a few additional summary functions that aren’t native features of Pig, but are offered by Linkedin’s might-as-well-be-native DataFu package. [4].
-
Cardinality (i.e. the count of distinct values): combine the
DISTINCT
operation and theCOUNT_STAR
function as demonstrated below, or use the DataFuHyperLogLogPlusPlus
UDF -
Variance of non-
null
values:VAR(bag)
, using thedatafu.pig.stats.VAR
UDF -
Standard Deviation of non-
null
values:SQRT(VAR(bag))
-
Quantiles:
Quantile(bag)
orStreamingQuantile(bag)
-
Median (50th Percentile Value) of a Bag:
Median(bag)
orStreamingMedian(bag)
The previous chapter (REF) has details on how to use UDFs, and so we’re going to leave the details of that to the sample code. You’ll also notice we list two functions for quantile and for median. Finding the exact median or other quantiles (as the Median/Quantile UDFs do) is costly at large scale, and so a good approximate algorithm (StreamingMedian/StreamingQuantile) is well appreciated. Since the point of this stanza is to characterize the values for our own sense-making, the approximate algorithms are appropriate. We’ll have much more to say about why finding quantiles is costly, why finding averages isn’t, and what to do about it in the Statistics chapter (REF).
weight_yr_stats = FOREACH (GROUP bat_seasons BY year_id) {
dist = DISTINCT bat_seasons.weight;
sorted_a = FILTER bat_seasons.weight BY weight IS NOT NULL;
sorted = ORDER sorted_a BY weight;
some = LIMIT dist.weight 5;
n_recs = COUNT_STAR(bat_seasons);
n_notnulls = COUNT(bat_seasons.weight);
GENERATE
group,
AVG(bat_seasons.weight) AS avg_val,
SQRT(VAR(bat_seasons.weight)) AS stddev_val,
MIN(bat_seasons.weight) AS min_val,
FLATTEN(ApproxEdgeile(sorted)) AS (p01, p05, p50, p95, p99),
MAX(bat_seasons.weight) AS max_val,
--
n_recs AS n_recs,
n_recs - n_notnulls AS n_nulls,
COUNT_STAR(dist) AS cardinality,
SUM(bat_seasons.weight) AS sum_val,
BagToString(some, '^') AS some_vals
;
};
-
Where You’ll Use It — Everywhere. Quality statistics on manufacturing batches. Response times of webserver requests. A/B testing in eCommerce.
-
Standard Snippet —
FOREACH (GROUP recs BY key) { … ; GENERATE …; };
-
Hello, SQL Users — Directly comparable for the most part.
-
Important to Know
-
Say
COUNT_STAR(recs)
, notCOUNT_STAR(recs.myfield)
— the latter creates a new bag and interferes with combiner’ing. -
Use
COUNT_STAR
and neverSIZE
on a bag. -
Say
SUM(recs.myfield)
, notSUM(myfield)
(which isn’t in scope). -
Get in the habit of writing
COUNT_STAR
and neverCOUNT
, unless you explicitly mean to only count non-`null`s.
-
-
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values. Big decrease in output size from turning bags into scalars
-
Records — Something like
mykey, aggregated_value, aggregated_value, …
-
Data Flow — Map, Combiner & Reduce; combiners quite effective unless cardinality is very high.
To summarize the statistics of a full table, we use a GROUP ALL
statement. That is, instead of GROUP [table] BY [key]
, write GROUP [table] ALL
. Everything else is as usual:
REGISTER /usr/lib/pig/datafu.jar
DEFINE VAR datafu.pig.stats.VAR();
DEFINE ApproxEdgeile datafu.pig.stats.StreamingQuantile( '0.01','0.05', '0.50', '0.95', '0.99');
...
weight_summary = FOREACH (GROUP people ALL) {
dist = DISTINCT people.weight_lb;
sorted_a = FILTER people.weight_lb BY weight_lb IS NOT NULL;
sorted = ORDER sorted_a BY weight_lb;
some = LIMIT dist.weight_lb 5;
n_recs = COUNT_STAR(people);
n_notnulls = COUNT(people.weight_lb);
GENERATE
group,
AVG(people.weight_lb) AS avg_val,
SQRT(VAR(people.weight_lb)) AS stddev_val,
MIN(people.weight_lb) AS min_val,
FLATTEN(ApproxEdgeile(sorted)) AS (p01, p05, p50, p95, p99),
MAX(people.weight_lb) AS max_val,
n_recs AS n_recs,
n_recs - n_notnulls AS n_nulls,
COUNT_STAR(dist) AS cardinality,
SUM(people.weight_lb) AS sum_val,
BagToString(some, '^') AS some_vals
;
};
As we hope you readily recognize, using the GROUP ALL
operation can be dangerous, as it requires bringing all the data onto a single reducer.
We’re safe here, even on larger datasets, because all but one of the functions we supplied above are efficiently 'algebraic': they can be significantly performed in the map phase and combiner’ed. This eliminates most of the data before the reducer. The cardinality calculation, done here with a nested DISTINCT
operation, is the only real contributor to reducer-side data size. For this dataset its size is manageable, and if it weren’t there is a good approximate cardinality function. We’ll explain the why and the how of algebraic functions and these approximate methods in the Statistics chapter. But you’ll get a good feel for what is and isn’t efficient through the examples in this chapter.)
Everything we said for "Completely Summarizing a Group" (REF), plus
-
Where You’ll Use It — Getting to know your data. Computing relative statistics or normalizing values. Topline totals and summaries.
-
Hello, SQL Users — Aggregate functions without a
GROUP BY
-
Important to Know
-
You’re sending all the data to one reducer, so make sure the aggregate functions are highly reductive
-
Note the syntax of the full-table group statement. There’s no I in TEAM, and no
BY
inGROUP ALL
.
-
-
Output Count — Single row
-
Data Flow — Map, Combiner, and single reducer
We showed how to examine the constituents of a string field in the preceding chapter, under "Tokenizing a String" (REF). But for forensic purposes similar to the prior example, it’s useful to summarize their length distribution.
name_first_summary_0 = FOREACH (GROUP bat_seasons ALL) {
dist = DISTINCT bat_seasons.name_first;
lens = FOREACH bat_seasons GENERATE SIZE(name_first) AS len;
--
n_recs = COUNT_STAR(bat_seasons);
n_notnulls = COUNT(bat_seasons.name_first);
--
examples = LIMIT dist.name_first 5;
snippets = FOREACH examples GENERATE
(SIZE(name_first) > 15 ? CONCAT(SUBSTRING(name_first, 0, 15),'…') : name_first) AS val;
GENERATE
group,
'name_first' AS var:chararray,
MIN(lens.len) AS minlen,
MAX(lens.len) AS maxlen,
--
AVG(lens.len) AS avglen,
SQRT(VAR(lens.len)) AS stdvlen,
SUM(lens.len) AS sumlen,
--
n_recs AS n_recs,
n_recs - n_notnulls AS n_nulls,
COUNT_STAR(dist) AS cardinality,
MIN(bat_seasons.name_first) AS minval,
MAX(bat_seasons.name_first) AS maxval,
BagToString(snippets, '^') AS examples,
lens AS lens
;
};
name_first_summary = FOREACH name_first_summary_0 {
sortlens = ORDER lens BY len;
pctiles = ApproxEdgeile(sortlens);
GENERATE
var,
minlen, FLATTEN(pctiles) AS (p01, p05, p10, p50, p90, p95, p99), maxlen,
avglen, stdvlen, sumlen,
n_recs, n_nulls, cardinality,
minval, maxval, examples
;
};
Everything we said for "Completely Summarizing a Group" (REF), plus
-
Where You’ll Use It — Getting to know your data. Sizing string lengths for creating a database schema. Making sure there’s nothing ill-formed or outrageously huge. Making sure all values for a categorical field or string key is correct.
-
Hello, SQL Users — Corresponding functions without a
GROUP BY
-
Important to Know
-
You’re sending all the data to one reducer, so make sure the aggregate functions are highly reductive
-
Note the syntax of the full-table group statement. There’s no I in TEAM, and no
BY
inGROUP ALL
.
-
-
Output Count — Single row
-
Data Flow — Map, Combiner, and single reducer
One of the most common uses of a group-and-aggregate is to create a histogram showing how often each value (or range of values) of a field occur. This calculates the distribution of seasons played — that is, it counts the number of players whose career lasted only a single season; who played for two seasons; and so forth.
vals = FOREACH bat_careers GENERATE n_seasons AS bin;
seasons_hist = FOREACH (GROUP vals BY bin) GENERATE
group AS bin, COUNT_STAR(vals) AS ct;
DUMP vals;
/*
(1,4781)
(2,2461)
(3,1583)
(4,1162)
...
(23,13)
(24,5)
(25,3)
(26,1)
(27,1)
*/
Referring back to the bat_seasons
relation, we can compute a histogram with example data:
vals = FOREACH (GROUP bat_seasons BY (player_id, name_first, name_last)) GENERATE
COUNT_STAR(bat_seasons) AS bin, flatten(group);
seasons_hist = FOREACH (GROUP vals BY bin) {
some_vals = LIMIT vals 3;
GENERATE group AS bin, COUNT_STAR(vals) AS ct, BagToString(some_vals, '|');
};
DUMP seasons_hist
/*
(1,4781,1|zay01|William|Zay|1|zoccope01|Pete|Zoccolillo|1|zimmero01|Roy|Zimmerman)
(2,2461,2|moranbi01|Bill|Moran|2|moranal01|Al|Moran|2|stewasc01|Scott|Stewart)
(3,1583,3|wilshwh01|Whitey|Wilshere|3|drisktr01|Travis|Driskill|3|dellwh01|Wheezer|Dell)
(4,1162,4|mahonji01|Jim|Mahoney|4|deanwa01|Wayland|Dean|4|ceccaar01|Art|Ceccarelli)
*/
So the pattern here is to:
-
Project only the values,
-
Group by the values,
-
Produce the group as key and the count as value.
-
Where You’ll Use It — Anywhere you need a more detailed sketch of your data than average/standard deviation or simple quantiles can provide
-
Standard Snippet —
vals = FOREACH recs GENERATE myfield AS bin; hist = FOREACH (GROUP vals BY bin) GENERATE group AS bin, COUNT_STAR(vals) AS ct;
. -
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values
-
Records — Output is
bin, ct:long
. You’ve turned records-with-values into values-with-counts -
Data Flow — Map, Combiner & Reduce; combiners very effective unless cardinality extremely high
Generating a histogram for games just as above produces mostly-useless output. There’s no material difference between a career of 2000 games and one of 2001 games, but each value receives its own count — making it hard to distinguish the density of 1-, 2-, and 3-count bins near 1000 games from the 1-, 2-, and 3-count bins near 1500 games.
-- Meaningless
G_vals = FOREACH bat_careers GENERATE G AS val;
G_hist = FOREACH (GROUP G_vals BY val) GENERATE
group AS val,
SUM(G_vals) AS ct;
DUMP G_hist;
/*
(1,658)
(2,946)
(3,1164)
...
(3298,3298)
(3308,3308)
(3562,3562)
*/
Instead, we will bin the data: divide by the bin size (50 in this case), and then multiply back by the bin size. The result of the division is an integer (since both the value and the bin size are of type int
), and so the resulting value of bin
is always an even multiple of the bin size. Values of 0, 12 and 49 all go to the 0
bin; 150 games goes to the 150
bin; and Pete Rose’s total of 3,562 games played becomes the only occupant of bin 3550
.
-- Binning makes it sensible
G_vals = FOREACH bat_careers GENERATE 50*FLOOR(G/50) AS val;
G_hist = FOREACH (GROUP G_vals BY val) GENERATE
group AS val,
COUNT_STAR(G_vals) AS ct;
DUMP G_hist;
/*
(0.0,6638)
(50.0,1916)
(100.0,1176)
...
(3250.0,1)
(3300.0,1)
(3550.0,1)
*/
How do you choose a binsize? The following three graphs zoom in on the tail (2000 or more games) to show bin sizes that are too large, too small, and just right.
The bin size of 2 is too fine — the counts are small, there are many trivial gaps, and there is a lot of non-meaningful bin-to-bin variation.
The bin size we chose, 50 games, works well. It’s a meaningful number (50 games represents about 1/3 of a season), it gives meaty counts per bin even when the population starts to become sparse, and yet preserves the gaps that demonstrate the epic scope of Pete Rose and our other outliers' careers.
Bin sizing is where your skill as a storyteller comes through.
Different underlying mechanics will give different distributions.
The histogram of career games shows that most players see only one game their whole career, and the counts drop off continuously at higher and higher career totals. You can’t play 30 games unless you were good enough to make it in to 29 games; you can’t play 100 games unless you continued to be good, didn’t get injured, didn’t get old, didn’t go to war between the thirtieth and ninety-ninth game, and so on.
Distributions, such as this one, that span many orders of magnitude in value and count, are easier to understand using a 'log-log graph'. The "log" is short for "logarithm," in which successive values represent orders of magnitude difference. On a log-log graph, then, the axes arrange the displayed values so that the same distance separates 1 from 10 as separates 10 from 100 and so on, for any ratio of values.
Though the career games data shows a very sharp dropoff, it is not a long-tail distribution, as you can see by comparing a power-law fit (which is always a straight line on a log-log graph) to the actual histogram.
In contrast, webpage views known to be are one of many phenomena that obey the "long-tail" distribution, as we can see by generating a histogram of hourly pageview counts for each Wikipedia page [5]. Since the data is so sharply exponential, we are better off binning it logarithmically. To do so we take the log of the value, chunk it (using the multiply-floor-undo method again), and then take the exponential to restore a representative value for the bin. (You’ll notice we avoid trouble taking the logarithm of zero by feeding it an insignificantly small number instead. This lets zero be included in the processing without materially altering the result)
pageviews = LOAD '/data/rawd/wikipedia/page_counts/pagecounts-20141126-230000.gz' USING PigStorage(' ') AS (
project_name:chararray,
page_title:chararray,
requests:long,
bytes:long
);
SET eps 0.001;
view_vals = FOREACH pageviews GENERATE
(long)EXP( FLOOR(LOG((requests == 0 ? $eps : requests)) * 10)/10.0 ) AS bin;
hist_wp_view = FOREACH (GROUP view_vals BY bin) GENERATE
group AS bin,
COUNT_STAR(view_vals) AS ct;
The result indeed is a nice sharp line on the log-log plot, and the logarithmic bins did a nice job of accumulating robust counts while preserving detail. Logarithmic bins are generally a better choice any time you’re using a logarithmic x-axis because it means that the span of each bin is visually the same size, aiding interpretation.
As you can see, you don’t have to only bin linearly. Apply any function that takes piecewise segments of the domain and maps them sequentially to the integers, then undo that function to map those integers back to a central value of each segment. The Wikipedia webserver logs data also includes the total bytes transferred per page; this data spans such a large range that we end up binning both logarithmically (to tame the upper range of values) and linearly (to tame the lower range of values) — see the sample code for details.
See Pattern in Use for Histograms, above (REF)
-
Where You’ll Use It — Anywhere the values make sense exponentially; eg values make sense as 1, 100, 1000, …, 10 million rather than 1 million, 2 million, …, 10 million. Anywhere you will use a logarithmic 'X' axis for displaying the bin values.
-
Important to Know — The result is a representative value from the bin (eg
100
), and not the log of that value (eglog(100)
). Decide whether representative should be a central value from the bin or the minimum value in the bin. -
Standard Snippet —
(long)EXP( FLOOR(LOGval == 0 ? $eps : val * bin_sf)/bin_sf )
for scale factorbin_sf
. Instead of substituting$eps
for zero you might prefer to filter them out.
Rather than continuing to write the histogram recipe over and over, let’s take a moment and generalize. Pig allows you to create macros that parameterize multiple statements:
DEFINE histogram(table, key) RETURNS dist {
vals = FOREACH $table GENERATE $key;
$dist = FOREACH (GROUP vals BY $key) GENERATE
group AS val,
COUNT_STAR(vals) AS ct;
};
DEFINE binned_histogram(table, key, binsize, maxval) RETURNS dist {
-- A list of numbers from 0-9999
numbers = LOAD '/data/gold/numbers10k.txt' AS (number:int);
vals = FOREACH $table GENERATE (long)(FLOOR($key / $binsize) * $binsize) AS bin;
all_bins = FOREACH numbers GENERATE (number * $binsize) AS bin;
all_bins = FILTER all_bins BY (bin <= $maxval);
$dist = FOREACH (COGROUP vals BY bin, all_bins BY bin) GENERATE
group AS bin,
(COUNT_STAR(vals) == 0L ? 0L : COUNT_STAR(vals)) AS ct;
};
You’ll notice we load a relation called numbers
. This is a trick to fill in empty bins in the histogram with 0. If you can’t follow this, don’t worry - we’ll cover COGROUP
in the next chapter.
Call the histogram macro on the batting career data as follows:
career_G_hist = binned_histogram(bat_careers, 'G', 50, 3600);
career_G_hist_2 = binned_histogram(bat_careers, 'G', 2, 3600);
career_G_hist_200 = binned_histogram(bat_careers, 'G', 200, 3600);
And on the people data as follows:
height_hist = binned_histogram(people, 'height_in', 40, 80);
weight_hist = binned_histogram(people, 'weight_lb', 10, 300);
birthmo_hist = histogram(people, 'birth_month');
deathmo_hist = histogram(people, 'death_month');
Now that finding a histogram is effortless, let’s examine more shapes of distributions.
To reach the major leagues, a player must possess multiple extreme attributes: ones that are easy to measure, like being tall or being born in a country where baseball is popular; and ones that are not, like field vision, clutch performance, the drive to put in outlandishly many hours practicing skills. Any time you are working with extremes as we are, you must be very careful to assume their characteristics resemble the overall population’s.
Here again are the graphs for players' height and weight, but now graphed against (in light blue) the distribution of height/weight for US males aged 20-29 [6].
The overall-population distribution is shown with light blue bars, overlaid with a normal distribution curve for illustrative purposes. The population of baseball players deviates predictably from the overall population: it’s an advantage to The distribution of player weights, meanwhile, is shifted somewhat but with a dramatically smaller spread.
Surely at least baseball players are born and die like the rest of us, though?
With a little Pig action, we can generate some histograms to answer that question:
vitals = FOREACH people GENERATE
height_in,
10*CEIL(weight_lb/10.0) AS weight_lb,
birth_month,
death_month;
birth_month_hist = histogram(vitals, 'birth_month');
death_month_hist = histogram(vitals, 'death_month');
height_hist = histogram(vitals, 'height_in');
weight_hist = histogram(vitals, 'weight_lb');
These graphs show the relative seasonable distribution of death rates, with adjustment for the fact that there are fewer days in February than July and so forth. As above, the background US rates are shown as darker outlined bars and the results from our data set as solid blue bars.
We were surprised to see how seasonal the death rate is. We all probably have a feel there’s more birthday party invitations in September than in March, but hopefully not so much for funerals. This pattern is quite consistent and as you might guess inverted in the Southern Hemisphere. Most surprisingly of all, it persists even in places with a mild climate. The most likely cause of fewer deaths in the summer is not fewer snow-covered driveways to shovel, it is that people take vactions — lowering stress, improving mood, and synthesizing vitamin D. (And there’s no clear signal of "hanging on for Christmas" in the data).
The baseball distribution is lumpier, as you’d expect from its smaller sample size [7], but matches the background distribution. Death treats baseball players, at least in this regard, as it does us all.
That is not true for the birth data! The format of the graph is the same as above, and again we see a seasonal distribution — with a peak nine months after the cold winter temperatures induce people to stay home and find alternative recreations. But the baseball data does not match the background distribution at all. The sharp spike in August following the nadir in May and June appears nowhere in the background data, and its phase (where it crosses the centerline) is shifted later by several months. In this data set, a player born in August is about 25% more likely to make the major leagues than a player born in June; restricting it to players from the United States born after 1950 makes august babies 50% more likely to earn a baseball card than June babies.
The reason is that since the 1940s, American youth leagues have used July 31st as an age cutoff. If Augusta were born on August 1st, then four calendar years and 364 days later she would still technically be four years old. Julien, who showed up the day before her and thus has spent five years and no days orbiting the Sun, is permitted to join the league as a five-year-old. The Augustas may be initially disappointed, but when they do finally join the league as five-year-and-364-day-old kids, they have nearly an extra year of growth compared to the Juliens who sign up with them, which on the whole provides a huge advantage at young ages. This earns the Augustas extra attention from their coaches, extra validation of their skill, and extra investement of "I’m good at Baseball!" in their identity.
A lot of big data analyses explore population extremes: manufacturing defects, security threats, disease carriers, peak performers. Elements arrive into these extremes exactly because multiple causative features drive them there (such as an advantageous height or birth month); and a host of other conflated features follow from those deviations (such as those stemming from the level of fitness athletes maintain).
So whenever you are examining populations of outliers, you cannot depend on their behavior resembling the universal population. Normal distributions may not remain normal and may not even retain a central tendency; independent features in the general population may become tightly coupled in the outlier group; and a host of other easy assumptions become invalid. Stay alert.
The histograms we’ve calculated have results in terms of counts. The results do a better general job of enforcing comparisons if express them as relative frequencies: as fractions of the total count. You know how to find the total:
HR_stats = FOREACH (GROUP bat_careers ALL) GENERATE COUNT_STAR(bat_careers) AS n_players;
The problem is that HR_stats is a single-row table, and so not something we can use directly in a FOREACH
expression. Pig gives you a piece of syntactic sugar for this specific case of a one-row table [8]: project the value as tablename.field as if it were an inner bag, but slap the field’s type (in parentheses) in front of it like a typecast expression:
HR_stats = FOREACH (GROUP bat_careers ALL) GENERATE COUNT_STAR(bat_careers) AS ct;
HR_hist = FOREACH (GROUP bat_careers BY HR) {
ct = COUNT_STAR(bat_careers);
GENERATE group as val,
ct/( (double)HR_stats.ct ) AS freq,
ct;
};
STORE HR_stats INTO 'HR_stats';
Typecasting the projected field as if you were simply converting the schema of a field from one scalar type to another acts as a promise to Pig that what looks like column of possibly many values will turn out to have only row. In return, Pig will understand that you want a sort of über-typecast of the projected column into what is effectively its literal value.
See Pattern in Use for "Histograms", above (REF), and "Re-injecting Global Values", following (REF).
-
Where You’ll Use It — Histograms on sampled populations. Whenever you want frequencies rather than counts, i.e. proportions rather than absolute values.
-
Standard Snippet — Same as for a histogram, but with
COUNT_STAR(vals)/((long)recs_info.ct) AS freq
.
Sometimes things are more complicated, and what you’d like to do is perform light synthesis of the results of some initial Hadoop jobs, then bring them back into your script as if they were some sort of "global variable". But a pig script just orchestrates the top-level motion of data: there’s no good intrinsic ways to bring the result of a step into the declaration of following steps. You can use a backhoe to tear open the trunk of your car, but it’s not really set up to push the trunk latch button. The proper recourse is to split the script into two parts, and run it within a workflow tool like Rake, Drake or Oozie. The workflow layer can fish those values out of the HDFS and inject them as runtime parameters into the next stage of the script.
In the case of global counts, it would be so much faster if we could sum the group counts to get the global totals; but that would mean a job to get the counts, a job to get the totals, and a job to get the relative frequencies. Ugh.
If the global statistic is relatively static, there are occasions where we prefer to cheat. Write the portion of the script that finds the global count and stores it, then comment that part out and inject the values statically — the sample code shows you how to do it using the cat
Grunt shell statement.
-- cheat mode:
-- HR_stats = FOREACH (GROUP bat_careers ALL) GENERATE COUNT_STAR(bat_careers) AS n_total;
-- STORE HR_stats INTO 'HR_stats';
SET HR_stats_n_total=`cat HR_stats`;
HR_hist = FOREACH (GROUP bat_careers BY HR) {
ct = COUNT_STAR(bat_careers);
GENERATE
HR as val,
ct AS ct,
ct/( (double)HR_stats_n_total) AS freq,
ct;
};
As we said, this is a cheat-to-win scenario: using it to knock three minutes off an eight minute job is canny when used to make better use of a human data scientist’s time, foolish when applied as a production performance optimization.
As long as the groups in question do not rival the available memory, counting how often each value occurs within a group is easily done using the DataFu CountEach
UDF.
For reference, see http://datafu.incubator.apache.org/docs/datafu/guide/bag-operations.html
sig_seasons = FILTER bat_seasons BY ((year_id >= 1900) AND
(lg_id == 'NL' OR lg_id == 'AL') AND
(PA >= 450));
REGISTER /usr/lib/pig/datafu.jar
DEFINE CountVals datafu.pig.bags.CountEach('flatten');
binned = FOREACH sig_seasons GENERATE
( 5 * ROUND(year_id/ 5.0f)) AS year_bin,
(20 * ROUND(H /20.0f)) AS H_bin;
hist_by_year_bags = FOREACH (GROUP binned BY year_bin) {
H_hist_cts = CountVals(binned.H_bin);
GENERATE
group AS year_bin,
H_hist_cts AS H_hist_cts;
};
We want to normalize this to be a relative-fraction histogram, so that we can make comparisons across eras even as the number of active players grows. Finding the total count to divide by is a straightforward COUNT_STAR on the group, but a peccadillo of Pig’s syntax makes using it a bit frustrating. Annoyingly, a nested FOREACH
can only "see" values from the bag it’s operating on, so there’s no natural way to reference the calculated total from the FOREACH
statement.
-- Won't work:
hist_by_year_bags = FOREACH (GROUP binned BY year_bin) {
H_hist_cts = CountVals(binned.H_bin);
tot = 1.0f*COUNT_STAR(binned);
H_hist_rel = FOREACH H_hist_cts GENERATE
H_bin,
(float)count/tot;
GENERATE
group AS year_bin,
H_hist_cts AS H_hist_cts,
tot AS tot;
};
The best current workaround is to generate the whole-group total in the form of a bag having just that one value. Then we use the CROSS operator to graft it onto each (bin,count) tuple, giving us a bag with (bin,count,total) tuples — yes, every tuple in the bag will have the same group-wide value. Finally, iterate across the tuples to find the relative frequency.
It’s more verbose than we’d like, but the performance hit is limited to the CPU and GC overhead of creating three bags ({(result,count)}
, {(result,count,total)}
, {(result,count,freq)}
) in quick order.
hist_by_year_bags = FOREACH (GROUP binned BY year_bin) {
H_hist_cts = CountVals(binned.H_bin);
tot = COUNT_STAR(binned);
GENERATE
group AS year_bin,
H_hist_cts AS H_hist,
{(tot)} AS info:bag{(tot:long)}; -- single-tuple bag we can feed to CROSS
};
hist_by_year = FOREACH hist_by_year_bags {
-- Combines H_hist bag {(100,93),(120,198)...} and dummy tot bag {(882.0)}
-- to make new (bin,count,total) bag: {(100,93,882.0),(120,198,882.0)...}
H_hist_with_tot = CROSS H_hist, info;
-- Then turn the (bin,count,total) bag into the (bin,count,freq) bag we want
H_hist_rel = FOREACH H_hist_with_tot GENERATE
H_bin,
count AS ct,
count/((float)tot) AS freq;
GENERATE
year_bin,
H_hist_rel;
};
-
Where You’ll Use It — Summarizing Cohorts. Comparatively plotting histograms as a small multiples plot (REF) or animation
-
Standard Snippet —
DEFINE CountVals datafu.pig.bags.CountEach('flatten'); FOREACH (GROUP recs BY bin) GENERATE group, CountVals(recs.bin);
. Must download and enable the DataFu package (REF) -
Important to Know — This is done largely in-memory at the reducer, so watch your data sizes
-
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values
-
Records — Output is
group, bag of (count, bin) tuples
. You’ve turned bags of records-with-values into bags of values-with-counts -
Data Flow — Map & Reduce. As you’ll learn in "Advanced Pig" (REF),
CountEach
is not an algebraic, but is an accumulator
We are of course terribly anxious to find out the results, so much so that having to switch over to R to graph our totals is more delay than we can bear. It’s also often nice to have production jobs dump a visual summary of the results that an operator can easily scan and sanity-check. And so let’s apply the "Formatting a String According to a Template" (REF) pattern to dump a readable summary of our results to the screen.
year_hists_H = FOREACH year_hists { -- put all bins in regular order H_hist_rel_o = ORDER H_hist_rel BY bin ASC; -- The PA threshold makes the lower bins ragged, exclude them H_hist_rel_x = FILTER H_hist_rel_o BY (bin >= 90); -- Format each bin/freq into a readable string H_hist_vis = FOREACH H_hist_rel_x GENERATE SPRINTF('%1$3d: %3$4.0f', bin, ct, (double)ROUND(100*freq)); -- Combine those strings into readable table GENERATE year_bin, BagToString(H_hist_vis, ' '); };
In this snippet, we first put all bins in regular order and exclude the lower bins (the minimum-plate appearances threshold makes them ragged). Next, we transform each bin-count-frequency triple into a readable string using SPRINTF
. Since we used positional specifiers (the 1$
part of %1$3d
), it’s easy to insert or remove fields in the display depending on what question you’re asking. Here, we’ve omitted the count as it wasn’t helpful for the main question we have: "What are the long-term trends in offensive production?". Finally, we use BagToString
to format the row. We first met that combination of formatting-elements-formatting-bag in "Representing a Complex Data Structure with a Delimited String" (REF) above. (We hope you’re starting to feel like Daniel-san in Karate Kid when all his work polishing cars comes together as deadly martial arts moves.)
1900 100: 21 125: 38 150: 27 175: 9 200: 2 225: 1 1905 100: 30 125: 37 150: 20 175: 4 200: 2 1910 100: 22 125: 40 150: 25 175: 9 200: 1 225: 1 1915 100: 25 125: 38 150: 20 175: 6 200: 1 225: 0 1920 100: 12 125: 26 150: 29 175: 21 200: 9 225: 1 250: 0 1925 100: 13 125: 29 150: 26 175: 19 200: 9 225: 2 250: 0 1930 100: 12 125: 30 150: 26 175: 20 200: 9 225: 1 250: 0 1935 100: 13 125: 29 150: 29 175: 19 200: 8 225: 1 1940 100: 20 125: 35 150: 29 175: 11 200: 2 1945 100: 26 125: 36 150: 22 175: 11 200: 2 225: 1 1950 100: 21 125: 29 150: 32 175: 12 200: 3 1955 100: 27 125: 31 150: 22 175: 14 200: 2 1960 100: 24 125: 29 150: 29 175: 12 200: 3 225: 0 1965 100: 26 125: 34 150: 24 175: 8 200: 2 225: 0 1970 100: 26 125: 35 150: 23 175: 9 200: 2 225: 0 1975 100: 23 125: 33 150: 26 175: 11 200: 3 225: 0 1980 100: 22 125: 34 150: 25 175: 11 200: 3 225: 0 1985 100: 27 125: 31 150: 26 175: 9 200: 3 225: 0 1990 100: 29 125: 33 150: 24 175: 10 200: 1 1995 100: 20 125: 31 150: 29 175: 14 200: 3 225: 0 2000 100: 22 125: 30 150: 29 175: 13 200: 3 225: 0 250: 0 2005 100: 19 125: 32 150: 28 175: 15 200: 3 225: 0 2010 100: 22 125: 36 150: 26 175: 11 200: 2
1900 0: 97 10: 3 1905 0: 99 10: 1 1910 0: 93 10: 6 20: 0 1915 0: 96 10: 3 20: 1 1920 0: 77 10: 18 20: 3 30: 1 40: 1 50: 0 1925 0: 71 10: 20 20: 4 30: 3 40: 1 50: 0 60: 0 1930 0: 62 10: 25 20: 6 30: 5 40: 2 50: 0 1935 0: 57 10: 27 20: 10 30: 4 40: 1 50: 0 1940 0: 64 10: 24 20: 8 30: 3 40: 0 1945 0: 58 10: 27 20: 10 30: 4 40: 1 50: 1 1950 0: 39 10: 33 20: 18 30: 7 40: 3 1955 0: 34 10: 32 20: 23 30: 8 40: 4 50: 1 1960 0: 33 10: 34 20: 22 30: 8 40: 3 50: 0 60: 0 1965 0: 38 10: 34 20: 19 30: 8 40: 2 50: 0 1970 0: 39 10: 34 20: 20 30: 5 40: 2 1975 0: 42 10: 33 20: 19 30: 6 40: 1 50: 0 1980 0: 41 10: 34 20: 18 30: 6 40: 1 1985 0: 33 10: 34 20: 25 30: 8 40: 1 1990 0: 36 10: 35 20: 20 30: 7 40: 2 50: 0 1995 0: 24 10: 32 20: 25 30: 13 40: 6 50: 1 60: 0 70: 0 2000 0: 19 10: 35 20: 26 30: 14 40: 5 50: 1 60: 0 70: 0 2005 0: 22 10: 34 20: 28 30: 12 40: 3 50: 1 2010 0: 24 10: 37 20: 27 30: 11 40: 2 50: 0
We’ll need to draw graphs to get any nuanced insight, but the long-term trends in production of Hits and Home Runs is strong enough that this chart tells a clear story. Baseball has seen two offensive booms: one in the 1920-1939 period, and one in the 1990-2009 period. However, the first was an on-base boom, with a larger proportion of players crossing the 200-hit mark than ever have since. The recent one was decidedly a power-hitting boom. There is an increase in the fraction of players reaching high seasonal hit totals, but the chart above shouts how large the increase in the proportion of players hitting 30-, 40-, and 50-home runs per year is.
-
Where You’ll Use It — Production jobs, to give the operator a readable summary that the job not only ran to completion but gave meaningful results. In development, to Know Thy Data.
-
Standard Snippet — A mashup of the Format with a Template, Represent Complex Data Structures, and Group-and-Aggregate patterns
-
Important to Know — This is more valuable, and more used by experts, than you might think. You’ll see.
-
Records — Up to you; enough for your brain, not too much for your eyes.
-
Exercises for you: Create a macro to generate such a table. It should accept parameters for sprintf template, filter limits and sort key.
There’s a pattern-of-patterns we like to call the "Summing trick", a frequently useful way to act on subsets of a group without having to perform multiple GROUP BY
or FILTER
operations. Call it to mind every time you find yourself thinking "gosh, this sure seems like a lot of reduce steps on the same key". Before we describe its generic nature, it will help to see an example.
Whenever you are exploring a dataset, you should determine figures of merit for each of the key statistics — easy-to-remember values that separate qualitatively distinct behaviors. You probably have a feel for the way that 30 C / 85 deg F reasonably divides a "warm" day from a "hot" one; and if I tell you that a sub-three-hour marathon distinguishes "really impress your friends" from "really impress other runners", you are equipped to recognize how ludicrously fast a 2:15 (the pace of a world-class runner) marathon is.
For our purposes, we can adopt 180 hits (H), 30 home runs (HR), 100 runs batted in (RBI), a 0.400 on-base percentage (OBP) and a 0.500 slugging percentage (SLG) each as the dividing line between a good and a great performance.
One reasonable way to define a great career is to ask how many great seasons a player had. We can answer that by counting how often a player’s season totals exceeded each figure of merit. The obvious tactic would seem to involve filtering and counting each bag of seasonal stats for a player’s career; that is cumbersome to write, brings most of the data down to the reducer, and exerts GC pressure materializing multiple bags.
mod_seasons = FILTER bat_seasons BY ((year_id >= 1900) AND (lg_id == 'NL' OR lg_id == 'AL'));
standards = FOREACH mod_seasons {
OBP = 1.0*(H + BB + HBP) / PA;
SLG = 1.0*(h1B + 2*h2B + 3*h3B + 4*HR) / AB;
GENERATE
player_id,
(H >= 180 ? 1 : 0) AS hi_H,
(HR >= 30 ? 1 : 0) AS hi_HR,
(RBI >= 100 ? 1 : 0) AS hi_RBI,
(OBP >= 0.400 ? 1 : 0) AS hi_OBP,
(SLG >= 0.500 ? 1 : 0) AS hi_SLG
;
};
Next, count the seasons that pass the threshold by summing the indicator value
career_standards = FOREACH (GROUP standards BY player_id) GENERATE
group AS player_id,
COUNT_STAR(standards) AS n_seasons,
SUM(standards.hi_H) AS hi_H,
SUM(standards.hi_HR) AS hi_HR,
SUM(standards.hi_RBI) AS hi_RBI,
SUM(standards.hi_OBP) AS hi_OBP,
SUM(standards.hi_SLG) AS hi_SLG
;
The summing trick involves projecting a new field whose value is based on whether it’s in the desired set, forming the desired groups, and aggregating on those new fields. Irrelevant records are assigned a value that will be ignored by the aggregate function (typically zero or null
), and so although we operate on the group as a whole, only the relevant records contribute.
In this case, instead of sending all the hit, home run, etc figures directly to the reducer to be bagged and filtered, we send a 1
for seasons above the threshold and 0
otherwise. After the group, we find the count of values meeting our condition by simply summing the values in the indicator field. This approach allows Pig to use combiners (and so less data to the reducer); and more importantly it doesn’t cause a bag of values to be collected, only a running sum (and so way less garbage-collector pressure).
Another example will help you see what we mean — next, we’ll use one GROUP
operation to summarize multiple subsets of a table at the same time.
First, though, a side note on these figures of merit. As it stands, this isn’t a terribly sophisticated analysis: the numbers were chosen to be easy-to-remember, and not based on the data. For actual conclusion-drawing, we should use the z-score (REF) or quantile (REF) figures (we’ll describe both later on, and use them for our performance analysis instead). And yet, for the exploratory phase we prefer the ad-hoc figures. A 0.400 OBP is a number you can hold in your hand and your head; you can go click around ESPN and see that it selects about the top 10-15 players in most seasons; you can use paper-and-pencil to feed it to the run expectancy table (REF) we’ll develop later and see what it says a 0.400-on-base hitter would produce. We’ve shown you how useful it is to identify exemplar records; learn to identify these touchstone values as well.
We can use the summing trick to apply even more sophisticated aggregations to conditional subsets. How did each player’s career evolve — a brief brilliant flame? A rise to greatness? Sustained quality? Let’s classify a player’s seasons by whether they are "young" (age 21 and below), "prime" (22-29 inclusive) or "older" (30 and older). We can then tell the story of their career by finding their OPS (our overall performance metric) both overall and for the subsets of seasons in each age range [9].
The complication here over the previous exercise is that we are forming compound aggregates on the group. To apply the formula career SLG = (career TB) / (career AB)
, we need to separately determine the career values for TB
and AB
and then form the combined SLG
statistic.
Project the numerator and denominator of each offensive stat into the field for that age bucket. Only one of the subset fields will be filled in; as an example, an age-25 season will have values for PA_all and PA_prime and zeros for PA_young and PA_older.
age_seasons = FOREACH mod_seasons {
young = (age <= 21 ? true : false);
prime = (age >= 22 AND age <= 29 ? true : false);
older = (age >= 30 ? true : false);
OB = H + BB + HBP;
TB = h1B + 2*h2B + 3*h3B + 4*HR;
GENERATE
player_id, year_id,
PA AS PA_all, AB AS AB_all, OB AS OB_all, TB AS TB_all,
(young ? 1 : 0) AS is_young,
(young ? PA : 0) AS PA_young, (young ? AB : 0) AS AB_young,
(young ? OB : 0) AS OB_young, (young ? TB : 0) AS TB_young,
(prime ? 1 : 0) AS is_prime,
(prime ? PA : 0) AS PA_prime, (prime ? AB : 0) AS AB_prime,
(prime ? OB : 0) AS OB_prime, (prime ? TB : 0) AS TB_prime,
(older ? 1 : 0) AS is_older,
(older ? PA : 0) AS PA_older, (older ? AB : 0) AS AB_older,
(older ? OB : 0) AS OB_older, (older ? TB : 0) AS TB_older
;
};
After the group, we can sum across all the records to find the plate-appearances-in-prime-seasons even though only some of the records belong to the prime-seasons subset. The irrelevant seasons show a zero value in the projected field and so don’t contribute to the total.
career_epochs = FOREACH (GROUP age_seasons BY player_id) {
PA_all = SUM(age_seasons.PA_all );
PA_young = SUM(age_seasons.PA_young);
PA_prime = SUM(age_seasons.PA_prime);
PA_older = SUM(age_seasons.PA_older);
-- OBP = (H + BB + HBP) / PA
OBP_all = 1.0f*SUM(age_seasons.OB_all) / PA_all ;
OBP_young = 1.0f*SUM(age_seasons.OB_young) / PA_young;
OBP_prime = 1.0f*SUM(age_seasons.OB_prime) / PA_prime;
OBP_older = 1.0f*SUM(age_seasons.OB_older) / PA_older;
-- SLG = TB / AB
SLG_all = 1.0f*SUM(age_seasons.TB_all) / SUM(age_seasons.AB_all);
SLG_prime = 1.0f*SUM(age_seasons.TB_prime) / SUM(age_seasons.AB_prime);
SLG_older = 1.0f*SUM(age_seasons.TB_older) / SUM(age_seasons.AB_older);
SLG_young = 1.0f*SUM(age_seasons.TB_young) / SUM(age_seasons.AB_young);
--
GENERATE
group AS player_id,
MIN(age_seasons.year_id) AS beg_year,
MAX(age_seasons.year_id) AS end_year,
--
OBP_all + SLG_all AS OPS_all:float,
(PA_young >= 700 ? OBP_young + SLG_young : null) AS OPS_young:float,
(PA_prime >= 700 ? OBP_prime + SLG_prime : null) AS OPS_prime:float,
(PA_older >= 700 ? OBP_older + SLG_older : null) AS OPS_older:float,
--
COUNT_STAR(age_seasons) AS n_seasons,
SUM(age_seasons.is_young) AS n_young,
SUM(age_seasons.is_prime) AS n_prime,
SUM(age_seasons.is_older) AS n_older
;
};
If you do a sort on the different OPS fields, you’ll spot Ted Williams (player ID willite01) as one of the top three young players, top three prime players, and top three old players. He’s pretty awesome.
-
Where You’ll Use It — Summarizing the whole and a small number of discrete subsets: all/true/false, country/region/region/region/.., all visitors/cohort A/cohort B.
-
Standard Snippet — Project dummy fields for each subset you’ll track, having an ignorable value for records not in that subset. Aggregating over the whole then aggregates only over that subset
-
Hello, SQL Users — This is a common trick in SQL cookbooks. Thanks y’all!
-
Important to Know — You have to manufacture one field per subset. At some point you should use finer-grained grouping instead — see "Group-Flatten-Decorate" (REF) and "Cube and Rollup" (REF).
-
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values. Data size should decrease greatly.
-
Data Flow — Similar to any group-and-aggregate. Combiners become highly effective as most of the values will be ignorable
We don’t need a trick to answer "which players have ever played for the Red Sox" — just select seasons with team id BOS
and eliminate duplicate player ids:
-- Players who were on the Red Sox at some time
onetime_sox_ids = FOREACH (FILTER bat_seasons BY (team_id == 'BOS')) GENERATE player_id;
onetime_sox = DISTINCT onetime_sox_ids;
The summing trick is useful for the complementary question "which players have never played for the Red Sox?" You might think to repeat the above but filter for team_id != 'BOS'
instead, but what that gives you is "which players have ever played for a non-Red Sox team?". The right approach is to generate a field with the value 1
for a Red Sox season and the irrelevant value 0
otherwise. The never-Sox are those with zeroes for every year.
player_soxness = FOREACH bat_seasons GENERATE
player_id,
(team_id == 'BOS' ? 1 : 0) AS is_soxy;
player_soxness_g = FILTER
(GROUP player_soxness BY player_id)
BY MAX(player_soxness.is_soxy) == 0;
never_sox = FOREACH player_soxness_g GENERATE
group AS player_id;
-
Where You’ll Use It — Security: badges that have "entered reactor core" but no "signed in at front desk" events. Users that clicked on three or more pages but never bought an item. Devices that missed QA screening.
-
Standard Snippet — create indicator field:
mt_f = FOREACH recs GENERATE …, (test_of_fooness ? 1 : 0) is_foo;
; find the non-foos:non_foos = FILTER (GROUP mt_f BY mykey) BY MAX(is_foo) == 0;
then project just the keys:non_foos = FOREACH non_foos GENERATE group AS mykey
. -
Hello, SQL Users — Another classic pattern from the lore
-
Important to Know — If you’re thinking "gosh, once I’ve got that indicator field I could not only test its non-zeroness but sum it and average it and …" then you’re thinking along the right lines.
-
Output Count — As many records as the cardinality of its key, i.e. the number of distinct values. Data size should decrease dramatically.
-
Records — List of keys
-
Data Flow — Map, Combiner & Reducer. Combiners should be extremely effective.
In this chapter we introduced grouping operations. We started with the basic Pig GROUP BY
syntax, and worked through several baseball problems to explain how to group in practice. We learned not only to calculate statistics about grouped data, we also learned how to print and format these statistical summaries. We summarized records, groups, fields, and entire relations using aggregate functions such as MIN
, MAX
, COUNT_STAR
and SUM
. We used these aggregate functions to summarize the careers of every baseball player in the modern era!
The operations in this chapter on grouping data are foundational, and help to put data in context. Putting data in context is the 'trick' to map/reduce. If we touched on some operations we didn’t cover in detail - worry not! We’ll get to operations like DISTINCT
in another chapter.
Now that we can group data, we can COGROUP
, or group between data relations. In our next chapter, we’ll look at a powerful extension of grouping techniques - JOINs. Inspired by SQL joins, Pig has extremely powerful joining capabilities that can pull additional data into the context of your analysis. We’ll take this opportunity to once again dive into map/reduce and learn how joins work!
-
Born at the Wrong Time: Selection Bias in the NHL Draft by Robert O. Deaner, Aaron Lowen, Stephen Cobley. February 27, 2013DOI: 10.1371/journal.pone.0057753
-
The Impact Of Baseball Age-Cutoff Date Rules, waswatching.com, May 23rd, 2013
-
The Boys of Late Summer, Greg Spira, April 16 2008.