Skip to content

Latest commit

 

History

History
1128 lines (879 loc) · 52.5 KB

XX11-advanced_patterns.asciidoc

File metadata and controls

1128 lines (879 loc) · 52.5 KB

Analytic Patterns: Advanced Pipelineable Operations

Handling Ties when Ranking Records

parks_o = ORDER parks BY state_id PARALLEL 3;

parks_nosort_inplace    = RANK parks;
parks_presorted_inplace = RANK parks_o;
parks_presorted_ranked  = RANK parks_o BY state_id DESC;
parks_ties_cause_skips  = RANK parks   BY state_id DESC;
parks_ties_no_skips     = RANK parks   BY state_id DESC DENSE;

STORE_TABLE(parks_nosort_inplace,    'parks_nosort_inplace');
STORE_TABLE(parks_presorted_inplace, 'parks_presorted_inplace');
STORE_TABLE(parks_presorted_ranked,  'parks_presorted_ranked');
STORE_TABLE(parks_ties_cause_skips,  'parks_ties_cause_skips');
STORE_TABLE(parks_ties_no_skips,     'parks_ties_no_skips');

-- You may need to disable partial aggregation in current versions of Pig.
SET pig.exec.mapPartAgg  false
-- Disabling multiquery just so we judge jobs independently
SET opt.multiquery          false
SET pig.exec.mapPartAgg.minReduction  8
;

DEFINE LastEventInBag org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField('2', 'max');

Selecting Records Associated with Maximum Values

As we learned at the start of the chapter, you can retrieve the maximum and minimum values for a field using the MAX(bag) and MIN(bag) functions respectively. These have no memory overhead to speak of and are efficient for both bags within groups and for a full table with GROUP..ALL. (By the way: from here out we’re just going to talk about maxima — unless we say otherwise everything applies for minimums by substituting the word 'minimum' or reversing the sort order as appropriate.)

But if you want to retrieve the record associated with a maximum value (this section), or retrieve multiple values (the followin section), you will need a different approach.

Selecting a Single Maximal Record Within a Group, Ignoring Ties

events_most_runs = LIMIT (ORDER events BY ev_runs_ct DESC) 40;

events_most_runs_g = FOREACH (GROUP events ALL)
  GENERATE FLATTEN(TOP(40, 16, events));

-- Final event of the game footnote:[For the purposes of a good demonstration,
-- we're ignoring the fact that the table actually has a boolean flag identifying
-- that event]

events_final_event_top = FOREACH (GROUP events BY game_id)
  GENERATE FLATTEN(TOP(1, 1, events));

events_final_event_lastinbag = FOREACH (GROUP events BY game_id)
  GENERATE FLATTEN(LastEventInBag(events));

events_final_event_orderlimit = FOREACH (GROUP events BY game_id) {
  events_o = ORDER events BY event_seq DESC;
  events_last = LIMIT events_o 1;
  GENERATE FLATTEN(events_last);
  };

events_final_event_orderfirst = FOREACH (GROUP events BY game_id) {
  events_o = ORDER events BY event_seq DESC;
  GENERATE FLATTEN(FirstTupleFromBag(events_o, ('')));
  };

nonsense_final_event = FOREACH (GROUP events BY event_desc)
  GENERATE FLATTEN(LastEventInBag(events));

For example, we may want to identify the team each player spent the most games with. Right from the start you have to decide how to handle ties. In this case, you’re probably looking for a single primary team; the cases where a player had exactly the same number of games for two teams is not worth the hassle of turning a single-valued field into a collection.

That decision simplifies our

-- -- -- How we made the events_evid table:
-- events = load_events();
-- events_evid = FOREACH events GENERATE game_id, event_seq, SPRINTF('%s-%03d', game_id, event_seq) AS event_id, year_id..;
-- STORE events_evid INTO '$data_dir/sports/baseball/events_evid';

-- ORDER BY on a full table: N
-----

Consulting the jobtracker console for the events_final_event_1 job shows
combine input records: 124205; combine output records: 124169 That's a pretty
poor showing. We know something pig doesn't: since all the events for a game
are adjacent in the file, the maximal record chosen by each mapper is almost
certainly the overall maximal record for that group.

Running it again with `SET pig.exec.nocombiner true` improved
the run time dramatically.

------
-- events = load_events();
-- events_evid = FOREACH events GENERATE game_id, event_seq, SPRINTF('%s-%03d', game_id, event_seq) AS event_id, year_id..;
-- team_season_final_event = FOREACH (GROUP events BY (home_team_id, year_id))
--   GENERATE FLATTEN(TOP(1, 2, events));

team_season_final_event = FOREACH (GROUP events BY (home_team_id, year_id)) {
  evs = FOREACH events GENERATE (game_id, event_seq) AS ev_id, *;
  GENERATE FLATTEN(TOP(1, 0, evs));
};

SELECT bat.player_id, bat.year_id, bat.team_id, MAX(batmax.Gmax), MAX(batmax.stints), MAX(team_ids), MAX(Gs)
  FROM       batting bat
INNER JOIN (SELECT player_id, year_id, COUNT(*) AS stints, MAX(G) AS Gmax, GROUP_CONCAT(team_id) AS team_ids, GROUP_CONCAT(G) AS Gs FROM batting bat GROUP BY player_id, year_id) batmax
  ON bat.player_id = batmax.player_id AND bat.year_id = batmax.year_id AND bat.G = batmax.Gmax
  GROUP BY player_id, year_id
  -- WHERE stints > 1
  ;

-- About 7% of seasons have more than one stint; only about 2% of seasons have
-- more than one stint and more than a half-season's worth of games
SELECT COUNT(*), SUM(mt1stint), SUM(mt1stint)/COUNT(*) FROM (SELECT player_id, year_id, IF(COUNT(*) > 1 AND SUM(G) > 77, 1, 0) AS mt1stint FROM batting GROUP BY player_id, year_id) bat
-----

Earlier in the chapter we annotated each player's season by whether they were
the league leader in Home Runs (HR):

------
bats_with_max_hr = FOREACH (GROUP bat_seasons BY year_id) GENERATE
  MAX(bat_seasons.HR) as max_HR,
  FLATTEN(bat_seasons);

-- Find the desired result:
bats_with_l_cg = FOREACH bats_with_max_hr GENERATE
  player_id.., (HR == max_HR ? 1 : 0);
bats_with_l_cg = ORDER bats_with_l_cg BY player_id, year_id;
--------

We can also do this using a join:

-------
-- Find the max_HR for each season
HR_by_year     = FOREACH bat_seasons GENERATE year_id, HR;
max_HR_by_year = FOREACH (GROUP HR_by_year BY year_id) GENERATE
  group AS year_id, MAX(HR_by_year.HR) AS max_HR;

-- Join it with the original table to put records in full-season context:
bats_with_max_hr_jn = JOIN
  bat_seasons    BY year_id, -- large table comes *first* in a replicated join
  max_HR_by_year BY year_id  USING 'replicated';
-- Find the desired result:
bats_with_l_jn = FOREACH bats_with_max_hr_jn GENERATE
  player_id..RBI, (HR == max_HR ? 1 : 0);
------

The COGROUP version has only one reduce step, but it requires sending the
full contents of the table to the reducer: its cost is two full-table scans
and one full-table group+sort. The JOIN version first requires effectively
that same group step, but with only the group key and the field of interest
sent to the reducer. It then requires a JOIN step to bring the records into
context, and a final pass to use it. If we can use a replicated join, the
cost is a full-table scan and a fractional group+sort for preparing the list,
plus two full-table scans for the replicated join. If we can't use a
replicated join, the cogroup version is undoubtedly superior.

So if a replicated join is possible, and the projected table is much smaller
than the original, go with the join version. However, if you are going to
decorate with multiple aggregations, or if the projected table is large, use
the GROUP/DECORATE/FLATTEN pattern.

==== Selecting Records Having the Top K Values in a Group (discarding ties)


Let's find the top ten home-run hitters for each season

------
%DEFAULT k_leaders 10
%DEFAULT topk_window 60
%DEFAULT topk        40
;
DEFINE IOver                  org.apache.pig.piggybank.evaluation.Over('int');

H_seasons = FOREACH bat_seasons GENERATE
  H, year_id, player_id;
H_seasons = FILTER H_seasons BY year_id >= 2000;

top_H_season_c = FOREACH (GROUP H_seasons BY year_id) {
  candidates = TOP(25, 0, H_seasons.(H, player_id));
  GENERATE group AS year_id, candidates AS candidates;
};

top_H_season_r = FOREACH top_H_season_c {
  candidates_o = ORDER candidates BY H DESC;
  ranked = Stitch(IOver(candidates_o, 'rank', -1, 0, 0), candidates_o); -- from first (-1) to last (-1), rank on H (0th field)
  is_ok = AssertUDF((MAX(ranked.result) > 10 ? 1 : 0),
    'All candidates for topk were accepted, so we cannot be sure that all candidates were found');
  GENERATE year_id, ranked AS candidates:bag{t:(rk:int, H:int, player_id:chararray)}, is_ok;
};

top_H_season = FOREACH top_H_season_r {
  topk = FILTER candidates BY rk <= 10;
  topk_str = FOREACH topk GENERATE SPRINTF('%2d %3d %-9s', rk, H, player_id) AS str;
  GENERATE year_id, MIN(topk.H), MIN(candidates.H), BagToString(topk_str, ' | ');
};
------

...

------
DEFINE MostHits org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField('1', 'max');
top_H_season = FOREACH (GROUP H_seasons BY year_id) {
  top_k     = TOP(10, 0, H_seasons);
  top_1     = MostHits(H_seasons);
  top_1_bag = TOP(1,  0, H_seasons);
  GENERATE
    group                 AS year_id,
    MAX(top_k.H)         AS max_H,
    -- FLATTEN(top_1.H)      AS max_H_2,
    -- top_1_bag.H           AS max_H_3,
    -- top_1                 AS top_1,
    -- FLATTEN(top_1_bag)    AS (H:int, year_id:int, player_id:chararray),
    -- top_1_bag             AS top_1_bag:bag{t:(H:int, year_id:int, player_id:chararray)},
    -- top_1_bag.H AS tH, -- :bag{t:(t1H:int)},
    top_k.(player_id, H) AS top_k;
};

top_H_season_2 = FOREACH top_H_season {
  top_k_o = FILTER top_k BY (H >= max_H);
  -- firsties = CROSS top_k, tH;
  -- top_k_o = ORDER top_k BY H DESC;
  GENERATE year_id, max_H, top_k_o;
};
------

Selecting Attribute wdw
-- http://pig.apache.org/docs/r0.12.0/api/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.html

------
DEFINE BiggestInBag org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField('1', 'max');
pl_best = FOREACH (GROUP bat_seasons BY player_id) GENERATE
  group AS player_id,
  BiggestInBag(bat_seasons.(H,   year_id, team_id)),
  BiggestInBag(bat_seasons.(HR,  year_id, team_id)),
  BiggestInBag(bat_seasons.(OBP, year_id, team_id)),
  BiggestInBag(bat_seasons.(SLG, year_id, team_id)),
  BiggestInBag(bat_seasons.(OPS, year_id, team_id))
  ;
------

==== Selecting Records Having the Top K Values in a Table


Find the top 40 seasons by hits.  Pig is smart about eliminating records at
the map stage, dramatically decreasing the data size.

top_H_seasons = LIMIT (ORDER sig_seasons BY H DESC, player_id ASC) 40;
top_H_seasons = RANK top_H_seasons;

A simple ORDER BY..LIMIT stanza may not be what you need, however. It will
always return K records exactly, even if there are ties for K'th place.
(Strangely enough, that is the case for the number we've chosen.)

The standard SQL trick is to identify the key for the K'th element (here,
it's Jim Bottomley's 227 hits in 1925) and then filter for records matching
or exceeding it. Unless K is so large that the top-k starts to rival
available memory, we're better off doing it in-reducer using a nested
FOREACH, just like we

http://pig.apache.org/docs/r0.12.0/api/org/apache/pig/piggybank/evaluation/Over.html[Piggybank's Over UDF]
allows us to

We limit within each group to the top `topk_window` (60) items, assuming
there are not 16 players tied for fourth in H. We don't assume for too long
-- an `ASSERT` statement verifies there aren't so many records tied for 4th
place that it overflows the 20 highest records we retained for consideration.

------
%DEFAULT topk_window 60
%DEFAULT topk        40
DEFINE IOver                  org.apache.pig.piggybank.evaluation.Over('int');
ranked_Hs = FOREACH (GROUP bats BY year_id) {
  bats_H  = ORDER bats BY H DESC;
  bats_N  = LIMIT bats_H $topk_window; -- making a bet, asserted below
  ranked  = Stitch(bats_N, IOver(bats_N, 'rank', -1, -1, 15)); -- beginning to end, rank on the 16th field (H)
  GENERATE
    group   AS year_id,
    ranked  AS ranked:{(player_id, year_id, team_id, lg_id, age, G, PA, AB, HBP, SH, BB, H, h1B, h2B, h3B, H, R, RBI, OBP, SLG, rank_H)}
    ;
};
-- verify there aren't so many records tied for $topk'th place that it overflows
-- the $topk_window number of highest records we retained for consideration
ASSERT ranked_Hs BY MAX(ranked.rank_H) > $topk; --  'LIMIT was too strong; more than $topk_window players were tied for $topk th place';

top_season_Hs = FOREACH ranked_Hs {
  ranked_Hs = FILTER ranked BY rank_H <= $topk;
  GENERATE ranked_Hs;
  };
------

=== Operations that Expand the number of Rows or Columns

If you count all the letters in a large-enough body of text, you'll generally find that the letter "e" (the most frequent) appears about 12% of the time, while z and q (the least frequent) appear less than 1% of the time. But names of people have a noticeably different distribution of characters, as we can demonstrate using the baseball data. The `people` table has two fields representing city names, a first name field and a last name field. We'll find the frequency distribution for each.

==== Parsing a Delimited String into a Collection of Values

TSV (tab-separated-values) is the Volkswagen Beetle of go-anywhere file formats: it's robust, simple, friendly and works everywhere. However, it has significant drawbacks, most notably that it can only store flat records: a member field with, say, an array type must be explicitly handled after loading. One common workaround for serializing an array type is to convert the array into a string, where each value is separated from the next using a delimiter -- a character that doesn't appear in any of the values. We'll demonstrate creating such a field in the next chapter (REF), and in fact we're going to sneak into the future and steal that section's output files.

------
team_parkslists = LOAD team_parklists AS (...)
xxx = FOREACH ... {
  parks = STRSPLITBAG(...);
  GENERATE ..., FLATTEN(parks), ...;
};
------

In other cases the value may not be a bag holding an arbitrarily-sized collection of values, but a tuple holding several composite fields. Among other examples, it's common to find addresses serialized this way. The people table has fields for (city,state,country) of both birth and death. We will demonstrate by first creating single birth_loc and death_loc fields, then untangling them.

------
people_shrunk = FOREACH people GENERATE
  player_id..birth_day,
  CONCAT(birth_city,'|', birth_state, '|', birth_country) AS birth_loc,
  death_year, death_month, death_day,
  CONCAT(death_city,'|', death_state, '|', death_country) AS death_loc,
  name_first.. ;

people_2 = FOREACH people_shrunk GENERATE
  player_id..birth_day,
  FLATTEN(STRSPLIT(birth_loc)) AS (birth_city, birth_state, birth_country),
  death_year, death_month, death_day,
  FLATTEN(STRSPLIT(death_loc)) AS (death_city, death_state, death_country),
  name_first.. ;
------

In this case we apply STRSPLIT, which makes a tuple (rather than STRSPLITBAG, which makes a bag). When we next apply FLATTEN to our tuple, it turns its fields into new columns (rather than if we had a bag, which would generate new rows). You can run the sample code to verify the output and input are identical.

TODO-reviewer: (combine this with the later chapter? There's a lot going on there, so I think no, but want opinion)

=== Flattening

==== Flattening a Bag Generates Many Records

attr_strings = FOREACH people {
  fields_bag = {('fn', nameFirst), ('ln', nameLast), ('ct', birthCity), ('ct', deathCity)};
  GENERATE FLATTEN(fields_bag) AS (type:chararray, str:chararray);
  };
-- ('fn',Hank)
-- ('ln',Aaron)
-- ...

attr_chars = FOREACH (FILTER attr_strings BY str != '') {
  chars_bag = STRSPLITBAG(LOWER(str), '(?!^)');
  GENERATE type, FLATTEN(chars_bag) AS token;
  };
DESCRIBE attr_chars;

chars_ct   = FOREACH (GROUP attr_chars BY (type, token))
  GENERATE group.type, group.token, COUNT_STAR(attr_chars) AS ct
  ;

==== Flattening a Tuple Generates Many Columns

chars_freq = FOREACH (GROUP chars_ct BY type) {
  tot_ct = SUM(chars_ct.ct);
  GENERATE group AS type, tot_ct AS tot_ct, FLATTEN(chars_ct.(ct, token));
  };
chars_freq = FOREACH chars_freq GENERATE type, token, ct, (int)ROUND(1e6f*ct/tot_ct) AS freq:int;
DESCRIBE chars_freq;

rmf                    $out_dir/chars_freq;
STORE chars_freq INTO '$out_dir/chars_freq';



==== Flatten on a Bag Generates Many Records from a Field with Many Elements

This snippet first produces a bag pairing each of the `chararray` values we want with the distribution it belongs to, then flattens it.

typed_strings = FOREACH people { fields_bag = {('fn', nameFirst), ('ln', nameLast), ('ct', birthCity), ('ct', deathCity)}; GENERATE FLATTEN(fields_bag) AS (type:chararray, str:chararray); };

Each single record having a bag turns into four records having a field called 'type' and a field called 'str':

fn Hank ln Aaron ct San Diego ct Inverness

==== Flatten on a Tuple Folds it into its Parent

Our next step is to split those string fields into characters. Pig provides a `STRSPLIT` function that _seems_ to do what we want (spoiler alert: for this purpose it doesn't, but we want to prove a point).

typed_chars = FOREACH typed_strings { chars_bag = STRSPLIT(str, '(?!^)');  — works, but not as we want GENERATE type, FLATTEN(chars_bag) AS token; };

The output we want would have one record per character in the `str` field, but that isn't what happens:

fn H a n k ln A a r o n …​

`STRSPLIT` returns a _tuple_, not a _bag_, and the `FLATTEN` operation applied to a tuple does not produce many records from the tuple field, it lifts the elements of the tuple into its container. This `FLATTEN(STRSPLIT(...))` combination is great for, say, breaking up a comma-delimited string into field, but we want to flatten the characters into multiple records. The pigsy package has the UDF we need:

register '/path/to/pigsy/target/pigsy-2.1.0-SNAPSHOT.jar'; DEFINE STRSPLITBAG pigsy.text.STRSPLITBAG(); — …​ typed_chars = FOREACH typed_strings { chars_bag = STRSPLITBAG(LOWER(str), '(?!^)'); GENERATE type, FLATTEN(chars_bag) AS token; };

===== Results

What remains is to group on the characters for each type to find their overall counts, and then to prepare the final results. We'll jump into all that in the next chapter, but (REF) shows the final results. The letters "k", "j", "b" and "y" are very over-represented in first names. The letter "z" is very over-represented in last names, possibly because of the number of Hispanic and Latin American players.

char % dictionary % prose % first names % excess a 8.49 8.16 8.31 1.01 b 2.07 1.49 3.61 2.00 c 4.53 2.78 3.67 .80 d 3.38 4.25 4.42 1.48 e 11.16 12.70 11.03 1.05 f 1.81 2.22 1.43 1.27 g 2.47 2.01 2.03 .96 h 3.00 6.09 3.40 1.23 i 7.54 6.96 6.85 .78 j .19 0.15 3.70 3.14 k 1.10 0.77 3.07 4.37 l 5.48 4.02 6.29 1.07 m 3.01 2.40 3.73 1.21 n 6.65 6.74 6.46 .92 o 7.16 7.50 6.81 .89 p 3.16 1.92 1.08 .31 q .19 0.09 . 3 .19 r 7.58 5.98 8.33 1.15 s 5.73 6.32 3.06 .49 t 6.95 9.05 4.00 .58 u 3.63 2.75 1.91 .49 v 1.00 0.97 1.15 1.25 w 1.28 2.36 .82 1.29 x .29 0.15 .22 .73 y 1.77 1.97 3.93 1.68 z .27 0.07 .19 .53

(TODO insert actual results, and decide which distribution (prose or dictionary) you'll normalize against)

===== Other Similar Patterns

The chapter on text data (REF) shows how to tokenize free text into a "word bag", using both Pig's simplistic `TOKENIZE` function and a UDF that applies a sophisticated computational linguistics library. In the Event Stream chapter (REF), we'll demonstrate dividing time range into discrete intervals. Lastly, the Statistics chapter (REF) describes a script to take summary statistics of all columns simultaneously, which involves transposing a record into attribute-value pairs.

We have much more to say about FLATTEN, but it's best done the next chapter so that we can illustrate our points well.

=== Generating Data

The challenge of generating data in a distributed system is how to distribute an assignment of _what_ to generate onto each node.

==== Generating Data by Distributing Assignments As Input

The best way to generate data in Hadoop is to prepare map inputs that represent assignments of what data to generate. There are two good examples of this pattern elsewhere in the book, so we won't try to contrive one here. One is the "poor-man's data loader" given in Chapter 3 (REF). The mapper input is a list of filenames or database queries; each mapper expands that trivial input into many rows of output. Another is the "self-inflicted DDOS" tool for stress-testing your website (REF). In that case, the mapper input is your historical weblogs, and the mapper output is formed from the web server response.

Another example of this pattern is the poor-man's data loader given in Chapter 3 (REF) -- prepare a mapper input that is a list of filenames or database queries, and have each mapper expand its trivial input into many rows of output.

==== Generating a Sequence Using an Integer Table

The surprisingly useful integers table -- 1, 2, 3, ... each on subsequent rows -- provides one way to get around this. We don't really have a good baseball-based example, but we can demonstrate generating the 11 million combinations of five letters using a map-reduce job (or the similar UDF):

.Generating Data

C2 = 262; C3 = 263; C4 = 264; C5 = 265 ORD_A = 'a'.ord mapper do |line| idx = line.to_i offsets = [ line / C5, (line / C4) % 26, (line / C3) % 26, (line / C2) % 26, line % 26 ] chars = offsets.map{|offset| (ORD_A + offset).chr } yield chars.join end

------
# seed the RNG with the index
www.ruby-doc.org/gems/docs/w/wukong-4.0.0/Wukong/Faker/Helpers.html
Faker::Config.locale = 'en-us'
Faker::Name.name #=> "Tyshawn Johns Sr."
Faker::PhoneNumber.phone_number #=> "397.693.1309"
Faker::Address.street_address #=> "282 Kevin Brook"
Faker::Address.secondary_address #=> "Apt. 672"
Faker::Address.city #=> "Imogeneborough"
Faker::Address.zip_code #=> "58517"
Faker::Address.state_abbr #=> "AP"
Faker::Address.country #=> "French Guiana"
Faker::Business.credit_card_number #=> "1228-1221-1221-1431"
Faker::Business.credit_card_expiry_date #=> <Date: 2015-11-11 ((2457338j,0s,0n),+0s,2299161j)>
mapper do |line|
  idx = line.to_i
  offsets = [ line / C5, (line / C4) % 26, (line / C3) % 26, (line / C2) % 26, line % 26 ]
  chars = offsets.map{|offset| (ORD_A + offset).chr }
  yield chars.join
end
------

  - Generating data using the assignment list as input
	- in particular, using the list of URLs or filenames or whatever -- TODO-Flip: not sure what you mean here?
	- just demonstrate with map-reduce only, no pig (unless we decide to use this to show an inline Ruby UDF?)

==== Generating Pairs

is there a way to do the SQL version more elegantly?

------
SELECT
    IF(home_team_id <= away_team_id, home_team_id, away_team_id) AS team_a,
    IF(home_team_id <= away_team_id, away_team_id, home_team_id) AS team_b,
    COUNT(*)
  FROM events ev
GROUP BY home_team_id, away_team_id
ORDER BY home_team_id, away_team_id
;
------

(do we want to show the group by or call forward to it)

You'll see a more elaborate version of this

=== Transpose record into attribute-value pairs

Group by season, transpose, and take the top 10 for each season, attribute pair


=== Using Group/Decorate/Flatten to Bring Group Context to Individuals

Defining the characteristic what we mean by an exceptional career is a matter
of taste, not mathematics; and selecting how we estimate those
characteristics is a matter of taste balanced by mathematically-informed
practicality.

* Total production: a long career and high absolute totals for hits, home runs and so forth
* Sustained excellence: high normalized rates of production (on-base percentage and so forth)
* Peak excellence: multiple seasons of exceptional performance

Earlier, when we created relative histograms, we demonstrated putting records
in context with global values.

To put them in context with whole-group examples, use a pattern we call
'group/decorate/flatten'. Use this when you want a table with the same shape
and cardinality as the original (that is, each record in the result comes
from a single record in the original), but which integrates aggregate
statistics from subgroups of the table.

Let's annotate each player's season by whether they were the league leader in
Home Runs (HR).

The group we need is all the player-seasons for a year, so that we can find
out what the maximum count of HR was for that year.
bats_by_year_g = GROUP bat_seasons BY year_id;

 — Decorate each individual record with the group summary, and flatten: bats_with_max_hr = FOREACH bats_by_year_g GENERATE MAX(bat_seasons.HR) as max_HR, FLATTEN(bat_seasons);

 — Now apply the group context to the records: bats_with_leaders = FOREACH bats_with_max_hr GENERATE player_id.., (HR == max_HR ? 1 : 0);

An experienced SQL user might think to do this with a join. That might or
might not make sense; we'll explore this alternative later in the chapter
under "Selecting Records Associated with Maximum Values".

normed_dec = FOREACH (GROUP bat_years BY (year_id, lg_id)) { batq = FILTER bat_years BY (PA >= 450); avg_BB = AVG(batq.BB); sdv_BB = SQRT(VAR(batq.BB)); avg_H = AVG(batq.H); sdv_H = SQRT(VAR(batq.H)); avg_HR = AVG(batq.HR); sdv_HR = SQRT(VAR(batq.HR)); avg_R = AVG(batq.R); sdv_R = SQRT(VAR(batq.R)); avg_RBI = AVG(batq.RBI); sdv_RBI = SQRT(VAR(batq.RBI)); avg_OBP = AVG(batq.OBP); sdv_OBP = SQRT(VAR(batq.OBP)); avg_SLG = AVG(batq.SLG); sdv_SLG = SQRT(VAR(batq.SLG));  —  GENERATE  — all the original values, flattened back into player-seasons FLATTEN(bat_years),  — all the materials for normalizing the stats avg_H AS avg_H, sdv_H AS sdv_H, avg_HR AS avg_HR, sdv_HR AS sdv_HR, avg_R AS avg_R, sdv_R AS sdv_R, avg_RBI AS avg_RBI, sdv_RBI AS sdv_RBI, avg_OBP AS avg_OBP, sdv_OBP AS sdv_OBP, avg_SLG AS avg_SLG, sdv_SLG AS sdv_SLG ; };

normed = FOREACH normed_dec GENERATE player_id, year_id, team_id, lg_id, G, PA, AB, HBP, SH, BB, H, h1B, h2B, h3B, HR, R, RBI, OBP, SLG, (H - avg_H ) /sdv_H AS zH, (HR - avg_HR ) /sdv_HR AS zHR, (R - avg_R ) /sdv_R AS zR, (RBI - avg_RBI) /sdv_RBI AS zRBI, (OBP - avg_OBP) /sdv_OBP AS zOBP, (SLG - avg_SLG) /sdv_SLG AS zSLG, ( ((OBP - avg_OBP)/sdv_OBP)
((SLG - avg_SLG)/sdv_SLG) ) AS zOPS ;

normed_seasons = ORDER normed BY zOPS ASC; STORE_TABLE(normed_seasons, 'normed_seasons');

=== Ungrouping operations (FOREACH..FLATTEN) expand records

So far, we've seen using a group to aggregate records and (in the form of `JOIN’) to match records between tables.
Another frequent pattern is restructuring data (possibly performing aggregation at the same time). We used this several times in the first exploration (TODO ref): we regrouped wordbags (labelled with quadkey) for quadtiles containing composite wordbags; then regrouping on the words themselves to find their geographic distribution.

The baseball data is closer at hand, though, so l

------
team_player_years = GROUP player_years BY (team,year);
FOREACH team_player_years GENERATE
   FLATTEN(player_years.player_id), group.team, group.year, player_years.player_id;
------

In this case, since we grouped on two fields, `group` is a tuple; earlier, when we grouped on just the `player_id` field, `group` was just the simple value.

The contextify / reflatten pattern can be applied even within one table. This script will find the career list of teammates for each player -- all other players with a team and year in common footnote:[yes, this will have some false positives for players who were traded mid-year. A nice exercise would be to rewrite the above script using the game log data, now defining teammate to mean "all other players they took the field with over their career".].

------
GROUP player_years BY (team,year);
FOREACH
   cross all players, flatten each playerA/playerB pair AS (player_a
FILTER coplayers BY (player_a != player_b);
GROUP by playerA
FOREACH {
   DISTINCT player B
}
------

Here's another

The result of the cross operation will include pairing each player with themselves, but since we don't consider a player to be their own teammate we must eliminate player pairs of the form `(Aaronha, Aaronha)`. We did this with a FILTER immediate before the second GROUP (the best practice of removing data before a restructure), but a defensible alternative would be to `SUBTRACT` playerA from the bag right after the `DISTINCT` operation.

=== Group flatten regroup

* OPS+ -- group on season, normalize, reflatten
* player's highest OPS+: regroup on player, top

Words/tiles:

(Word tile wd_doc_ct doc_tot)
Group on word find total word count, total doc count
(Word tile
    doc-usg:val(wd,doc)
    doc-tot_usgs:sum(u|*,doc)   doc-n_wds:count(w|*,doc)
    wd-tot_usgs:sum(u|wd,*)                                                wd-n_docs:count(d|wd,*)
    tot-usgs:sum(*,*)                  n_wds:count(w|*,*)            ct-docs:count(d|*,*)

   usgs    tile-ct-wds     tile-ct-docs

    pl-yr-ops:val(pl,yr)
    yr-tot-ops:sum(ops|*,yr)            yr-n-pl:count(pl|*,yr)   yr-avg-ops:avg(ops|*,yr)
    pl-yr-oz:(pl-yr-ops/yr-avg-ops)
    pl-max-oz:max(pl-yr-oz|p,*)

    yr-g:(*,y)
    te-yr-g:(*,te,y)

Name tables for dominating primary keys. If a value is subsumed, omit. Keys are x_id always
              pl-yr[te,ops]  pk-te-yr[]
              pl-info[...] -- vertical partition on any other func(pl)
If Non unique key, assumed that table xx has id xx_id

 Do not get join happy: find year averages, join all on year, group on player
Just group on year then flatten with records.

Style: n_H, ct_H, H_ct? n_H because the n_* have same schema, and because ^^^

=== Decorate-Flatten-Redecorate

The patterns we've introduced so far  looking at baseball's history

That's the same analysis used to determine whether to go for it on fourth down in American football, and a useful model for predicting asset prices and other "Bayesian" analysis (TECH am I using the right term): given a discrete assessment of the current state, what future outcomes result?

To do this, we need to first determine the final inning and final game outcome for each event, and then determine the distribution of outcomes across all events for each game state. The first requires placing all events into context by inning and game; the second requires placing them into context by event type.

For each combination of <ocuppied bases, game score, outs, inning, game over>, we want to find

* how often that situation crops up -- how often is the home team down 3-0, with two outs in the bottom of the final inning with the bases loaded? In this situation every pitch could result in immediate victory or immediate defeat.
* from the given situation, how likely is the team to finally prevail? How often does the mighty Casey come through with a four-run "grand-slam" home run, and how often does he
* on average, how many additional runs will be scored by that team by the end of the inning
* the number of times a team in that situation has won, lost, or tied.

    inn inn_home beg_outs beg_1b beg_2b beg_3b  beg_score end_inn_score end_gm_score

http://www.baseball-almanac.com/poetry/po_case.shtml

Exercise: the chief promise of big data is to replace ad-hoc reasoning and conventional wisdom with clear direction based on reason and experience. The chief peril of big data is to only analyze what you can measure, discarding expert knowledge in favor of shallow patterns. The "bunt" tactic is a case in point. A batter "bunts" by putting down a difficult-to-field little squib hit. The base runners, who can get a head start, usually advance; the batter, who has to finish the batting motion, is usually thrown out. In effect, a successful bunt exchanges one out for a single-base advance of each base runner, scoring a run if there was someone on third base.
Suppose bunts were always successful. For each game state with base runners and zero or one outs, what is the difference in expected runs scored in that inning compared to the state with one more out and each runner advanced by a slot, plus one run if there was a base runner on third?

The data very clearly shows that, all things being equal, a bunt is a bad tactic

The consensus is that (a) traditional managers use the bunt far more often than is justified; (b) factors of game theory, psychology, and others that are difficult to quantify say that it should be employed somewhat more often than the data-driven analysis would indicate. But any sport writer looking to kick up a good ol' jocks-vs-nerds donnybrook can reliably do so by claiming that bunts are, or are not, a sound strategy. http://www.lookoutlanding.com/2013/8/5/4589844/the-evolution-of-the-sacrifice-bunt-part-1

We have, thanks to Retrosheet, the record of the more than 9 million plays from 1950-present.
The game event files have many many fields, but

------
SELECT
  game_id, LEFT(game_id,3) AS home_team_id, away_team_id, event_id, DATE(SUBSTRING(game_id, 4,8)) AS game_date, 0+RIGHT(game_id, 1) AS game_seq,
  inn_ct AS inn, bat_home_id AS inn_home, outs_ct AS beg_outs_ct, 				-- inning and outs
  IF(inn_end_fl = 'T', 1, 0) AS is_end_inn, IF(game_end_fl = 'T', 1, 0) AS is_end_game,
  event_outs_ct + outs_ct AS end_outs_ct,
  -- @runs_on_play := IF(bat_dest_id > 3, 1, 0) + IF(run1_dest_id > 3, 1, 0) + IF(run2_dest_id > 3, 1, 0) + IF(run3_dest_id > 3, 1, 0) AS runs_on_play,
  @runs_on_play := event_runs_ct AS runs_on_play,
  event_cd, h_cd, ab_fl,
  home_score_ct, away_score_ct,
  @beg_scdiff    := home_score_ct - away_score_ct AS beg_scdiff,		-- score differential
  @end_scdiff    := @beg_scdiff + IF(bat_home_id = 1, @runs_on_play, -@runs_on_play) AS end_scdiff,
  pit_id, bat_id, base1_run_id, base2_run_id, base3_run_id,			-- bases state
  bat_dest_id, run1_dest_id, run2_dest_id, run3_dest_id
 FROM events
WHERE (game_id LIKE 'BOS2012%')
  AND bat_event_fl != 'T'
  -- AND inn_ct > 6
ORDER BY game_id, inn, inn_home, outs_ct
;
------


// footnote:[The fancy term is "transitive dependency"; it makes the difference between second and third normal form. Unless you already know what those mean, forget this paragraph exists.]

Get the game state (inning + top/bottom; number of outs; bases occupied; score differential), and summable-trick fields for finding the score at the end of the inning and at the end of the game.

Only one record per inning will have a value for end_inn_sc_maybe, and only one per game for end_game_sc_maybe: so taking the 'MAX' gives only the value of that entry.

Only innings of 3 full outs are useful for the run expectancy table; otherwise no end_inn_sc is calculated.

------
evs_summable = FOREACH events {
  beg_sc  = (home_score - away_score);
  end_sc  = beg_sc + ev_runs_ct;
  GENERATE
    game_id                   AS game_id,
    inn                       AS inn,
    (inn_home == 1 ? 1 : -1)  AS inn_sign:int,
    beg_outs_ct               AS beg_outs_ct,
    (run1_id != '' ? 1 : 0)   AS occ1:int,
    (run2_id != '' ? 1 : 0)   AS occ2:int,
    (run3_id != '' ? 1 : 0)   AS occ3:int,
    beg_sc                    AS beg_sc:int,
    ((is_end_inn  == 1) AND (beg_outs_ct + ev_outs_ct == 3) ? end_sc : NULL) AS end_inn_sc_maybe:int,
    (is_end_game == 1 ? end_sc : NULL)                                       AS end_game_sc_maybe:int
    -- , away_score, home_score, ev_runs_ct, ev_outs_ct, is_end_inn, is_end_game, event_seq
    ;
  };
------

Decorate each game's records with the end-of-game score, then partially
flatten by inning+half. The result is as if we had initially grouped on
(game_id, inn, inn_sign) -- but since each (game) group strictly contains
each (game, inn, inn_sign) subgroup, we don't have to do another reduce!

-------
evs_by_inning = FOREACH (GROUP evs_summable BY game_id) {
  GENERATE
    MAX(evs_summable.end_game_sc_maybe) AS end_game_sc,
    FLATTEN(BagGroup(evs_summable, evs_summable.(inn, inn_sign)))
    ;
  };
------

Flatten further back into single-event records, but now decorated with the
end-game and end-inning scores and won/loss/tie status:

* Decorate each inning's records with the end-of-inning score
* Figure out if the game was a win / loss / tie
* Convert end-of-* score differentials from (home-away) to (batting-fielding)
* Flatten back into individual events.
* Decorate each inning's records with the gain-to-end-of-inning. note that
  this is a batting-fielding differential, not home-away differential

Must use two steps because end_inn_sc is used to find inn_gain, and you can't
iterate inside flatten.

------
evs_decorated = FOREACH evs_by_inning {
  is_win  = ((group.inn_sign*end_game_sc >  0) ? 1 : 0);
  is_loss = ((group.inn_sign*end_game_sc <  0) ? 1 : 0);
  is_tie  = ((group.inn_sign*end_game_sc == 0) ? 1 : 0);
  end_inn_sc = MAX(evs_summable.end_inn_sc_maybe);
  GENERATE
    group.inn, group.inn_sign,
    FLATTEN(evs_summable.(beg_outs_ct, occ1, occ2, occ3, beg_sc
    -- , away_score, home_score, ev_runs_ct, ev_outs_ct, is_end_inn, is_end_game, event_seq, game_id
    )) AS (beg_outs_ct, occ1, occ2, occ3, beg_sc),
    end_game_sc AS end_game_sc,
    end_inn_sc AS end_inn_sc,
    is_win, is_loss, is_tie
    ;
  };
evs_decorated = FOREACH evs_decorated GENERATE
    inn, inn_sign, beg_outs_ct, occ1, occ2, occ3, beg_sc,
  -- away_score, home_score, ev_runs_ct, ev_outs_ct, is_end_inn, is_end_game, event_seq, game_id,
    inn_sign*(end_inn_sc - beg_sc) AS inn_gain,
    end_inn_sc, end_game_sc, is_win, is_loss, is_tie
    ;
------

group by game, decorate; flatten by game+inning, decorate; flatten

(Shoot this won't work for demonstrating the cogroup-regroup I think)

TODO for geographic count example use the Datafu udf to do the document counts




Here are Tangotiger's results for comparison, giving the average runs scored, from given base/out state to end of inning (for completed innings through the 8th inning); uses Retrosheet 1950-2010 data as of 2010. http://www.tangotiger.net/re24.html

------
			  1993-2010            1969-1992           1950-1968
	bases \ outs 0_out 1_out 2_out   0_out 1_out 2_out   0_out 1_out 2_out

	-  -   -     0.544 0.291 0.112   0.477 0.252 0.094   0.476 0.256 0.098
	-  -   3B    1.433 0.989 0.385   1.340 0.943 0.373   1.342 0.926 0.378
	-  2B  -     1.170 0.721 0.348   1.102 0.678 0.325   1.094 0.680 0.330
	-  2B  3B    2.050 1.447 0.626   1.967 1.380 0.594   1.977 1.385 0.620
	1B -   -     0.941 0.562 0.245   0.853 0.504 0.216   0.837 0.507 0.216
	1B -   3B    1.853 1.211 0.530   1.715 1.149 0.484   1.696 1.151 0.504
	1B 2B  -     1.556 0.963 0.471   1.476 0.902 0.435   1.472 0.927 0.441
	1B 2B  3B    2.390 1.631 0.814   2.343 1.545 0.752   2.315 1.540 0.747

		      1993-2010               1969-1992           1950-1968              1950-2010
	-  -   -     0.539 0.287 0.111   0.471 0.248 0.092   0.471 0.252 0.096     0.4957  0.2634  0.0998
	-  -   3B    1.442 0.981 0.382   1.299 0.92  0.368   1.285 0.904 0.373     1.3408  0.9393  0.374
	-  2B  -     1.172 0.715 0.339   1.081 0.663 0.316   1.055 0.662 0.322     1.1121  0.682   0.3257
	-  2B  3B    2.046 1.428 0.599   1.927 1.341 0.56    1.936 1.338 0.59      1.9754  1.3732  0.5814
	1B -   -     0.932 0.554 0.239   0.843 0.496 0.21    0.828 0.5   0.211     0.8721  0.5181  0.2211
	1B -   3B    1.841 1.196 0.517   1.699 1.131 0.47    1.688 1.132 0.491     1.7478  1.1552  0.4922
	1B 2B  -     1.543 0.949 0.456   1.461 0.886 0.42    1.456 0.912 0.426     1.4921  0.9157  0.4349
	1B 2B  3B    2.374 1.61  0.787   2.325 1.522 0.721   2.297 1.513 0.724     2.3392  1.5547  0.7482
------

==== Generate a won-loss record

Using the summing trick footnote:[we're skipping some details such as forfeited games, so the numbers won't agree precisely with the combined team numbers.]

------
  -- generate a summable value for each game, once for home and once for away:
home_games = FOREACH games GENERATE
  home_team_id AS team_id, year_id,
  IF (home_runs_ct > away_runs_ct, 1,0) AS win,
  IF (home_runs_ct < away_runs_ct, 1,0) AS loss,
  If (forfeit == ...) as forf_w, ...
  ;
away_games = FOREACH games GENERATE
  away_team_id AS team_id, year_id,
  IF (home_runs_ct < away_runs_ct, 1,0) AS win,
  IF (home_runs_ct > away_runs_ct, 1,0) AS loss
  ;
------

Now you might be tempted (especially if you are coming from SQL land) to follow this with a UNION of `home_games` and `away_games`. Don't! Instead, use a COGROUP. Once you've wrapped your head around it, it's simpler and more efficient.

------
team_games = COGROUP home_games BY (team_id, year_id), away_games BY (team_id, year_id);
------

Each combination of team and year creates one row with the following fields:

* `group`, a tuple with the `team_id` and `year_id`
* `home_games`, a bag holding tuples with `team_id`, `year_id`, `win` and `loss`
* `away_games`, a bag holding tuples with `team_id`, `year_id`, `win` and `loss`

------
team_games:
((BOS,2004),  {(BOS,2004,1,0),(BOS,2004,1,0),...}, {(BOS,2004,0,1),(BOS,2004,1,0),...})
...
------

You should notice a few things:

* The group values go in a single field (the first one) called `group`.
* Since we grouped on two fields, the group value is a tuple; if we had grouped on one field it would have the same schema as that field
* The name of the _table_ in the COGROUP BY statement became the name of the _field_ in the result
* The group values appear redundantly in each tuple of the bag. That's OK, we're about to project them out.

This is one of those things to think back on when you're looking at a script and saying "man, I just have this feeling this script has more reduce steps than it deserves".

The next step is to calculate the answer:

------
...
team_games = COGROUP home_games BY....
winloss_record = FOREACH team_games {
  wins   = SUM(home_games.win)    + SUM(away_games.win);
  losses = SUM(home_games.loss)   + SUM(away_games.loss);
  G      = COUNT_STAR(home_games) + COUNT_STAR(away_games);
  G_home = COUNT_STAR(home_games);
  ties   = G - (wins + losses);
  GENERATE group.team_id, group.year_id, G, G_home, wins, losses, ties;
};
------

Exercise: Do this instead with a single GROUP. Hint: the first FOREACH should have a FLATTEN.

==== Run Expectancy

How many runs is a game state worth from the perspective of any inning?
Bases are cleared away at inning finish, so the average number of runs scored
from an event to the end of its inning is the dominant factor.


------
-- Only want non-walkoff and full innings
re_evs      = FILTER evs_decorated BY (inn <= 8) AND (end_inn_sc IS NOT NULL);
re_ev_stats = FOREACH (GROUP re_evs ALL) {
  re_ev_ct = COUNT_STAR(re_evs);
  GENERATE re_ev_ct AS ct, ((double)re_ev_ct / (double)event_stats.ct) AS re_ev_fraction;
  };

-- Group on game state in inning (outs and bases occupied), and find the average score gain
run_expectancy = FOREACH (GROUP re_evs BY (beg_outs_ct, occ1, occ2, occ3)) {
  GENERATE
    FLATTEN(group)       AS (beg_outs_ct, occ1, occ2, occ3),
    AVG(re_evs.inn_gain) AS avg_inn_gain,
    COUNT_STAR(re_evs)   AS ct,
    (long)re_ev_stats.ct AS tot_ct,
    (long)event_stats.ct AS tot_unfiltered_ct;
  };
------

Baseball Researchers usually format run expectancy tables with rows as bases
and columns as outs.  The summable trick will let us create a pivot table of
bases vs. runs.

------
re_summable = FOREACH run_expectancy GENERATE
  CONCAT((occ1 IS NULL ? '-  ' : '1B '), (occ2 IS NULL ? '-  ' : '2B '), (occ3 IS NULL ? '-  ' : '3B ')) AS bases:chararray,
  (beg_outs_ct == 0 ? avg_inn_gain : 0) AS outs_0_col,
  (beg_outs_ct == 1 ? avg_inn_gain : 0) AS outs_1_col,
  (beg_outs_ct == 2 ? avg_inn_gain : 0) AS outs_2_col
  ;
re_pretty = FOREACH (GROUP re_summable BY bases) GENERATE
  group AS bases,
  ROUND_TO(MAX(re_summable.outs_0_col), 3) AS outs_0_col,
  ROUND_TO(MAX(re_summable.outs_1_col), 3) AS outs_1_col,
  ROUND_TO(MAX(re_summable.outs_2_col), 3) AS outs_2_col,
  $beg_year AS beg_year, $end_year AS end_year
  ;
------


==== Cube and rollup

stats by team, division and league

http://joshualande.com/cube-rollup-pig-data-science/
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation,+Cube,+Grouping+and+Rollup#EnhancedAggregation,Cube,GroupingandRollup-CubesandRollups

From manual: "Handling null values in dimensions
Since null values are used to represent subtotals in cube and rollup operation, in order to differentiate the legitimate null values that already exists as dimension values, CUBE operator converts any null values in dimensions to "unknown" value before performing cube or rollup operation. For example, for CUBE(product,location) with a sample tuple (car,null) the output will be
`{(car,unknown), (car,null), (null,unknown), (null,null)}`"

------
http://labs.opendns.com/2013/04/08/pig-jruby/?referred=1
pairs_r = FOREACH (GROUP raw BY client_ip) {
  client_queries = FOREACH raw GENERATE ts, name;
  client_queries = ORDER client_queries BY ts, name;
  GENERATE client_queries;
};
------

=== GROUP/COGROUP To Restructure Tables

This next pattern is one of the more difficult to picture but also one of the most important to master. Once you can confidently recognize and apply this pattern, you can consider yourself a black belt in the martial art of Map/Reduce.

(TODO: describe this pattern)

=== Group Elements From Multiple Tables On A Common Attribute (COGROUP)

The fundamental structural operation in Map/Reduce is the COGROUP:  assembling records from multiple tables into groups based on a common field; this is a one-liner in Pig, using, you guessed it, the COGROUP operation. This script returns, for every world map grid cell, all UFO sightings and all airport locations within that grid cell footnote:[We've used the `quadkey` function to map geocoordinates into grid cells; you'll learn about in the Geodata Chapter (REF)]:

------
sightings = LOAD('/data/gold/geo/ufo_sightings/us_ufo_sightings.tsv') AS (...);
airports     = LOAD('/data/gold/geo/airflights/us_airports.tsv') AS (...);
cell_sightings_airports = COGROUP
   sightings by quadkey(lng, lat),
   airports  by quadkey(lng, lat);
STORE cell_sightings_locations INTO '...';
------

In the equivalent Map/Reduce algorithm, you label each record by both the indicated key and a number based on its spot in the COGROUP statement (here, records from sightings would be labeled 0 and records from airports would be labeled 1). Have Hadoop then PARTITION and GROUP on the COGROUP key with a secondary sort on the table index. Here is how the previous Pig script would be done in Wukong:

------
mapper(partition_keys: 1, sort_keys: 2) do
 recordize_by_filename(/sightings/ => Wu::Geo::UfoSighting, /airport/ => Wu::Geo::Airport)
 TABLE_INDEXES = { Wu::Geo::UfoSighting => 0, Wu::Geo::Airport => 1 }
 def process(record)
   table_index = TABLE_INDEXES[record.class] or raise("Don't know how to handle records of type '{record.class}'")
   yield( [Wu::Geo.quadkey(record.lng, record.lat), table_index, record.to_wire] )
 end
end

reducer do
 def recordize(quadkey, table_index, jsonized_record) ; ...; end
 def start(key, *)
   @group_key = key ;
   @groups = [ [], [] ]
 end
 def accumulate(quadkey, table_index, record)
   @groups[table_index.to_i] << record
 end
 def finalize
   yield(@group_key, *groups)
 end
end
------

The Mapper loads each record as an object (using the file name to recognize which class to use) and then emits the quadkey, the table index (0 for sightings, 1 for airports) and the original record's fields. Declaring partition keys 1, sort keys 2 insures all records with the same quadkey are grouped together on the same Reducer and all records with the same table index arrive together. The body of the Reducer makes temporary note of the GROUP key, then accumulates each record into an array based on its type.

The result of the COGROUP statement always has the GROUP key as the first field. Next comes the set of elements from the table named first in the COGROUP statement -- in Pig, this is a bag of tuples, in Wukong, an array of objects. After that comes the set of elements from the next table in the GROUP BY statement and so on.

While a standalone COGROUP like this is occasionally interesting, it is also the basis for many other common patterns, as you'll see over the next chapters.

=== Co-Grouping Elements from Multiple Tables

Let's continue our example of finding the list of home ballparks for each player over their career.

You can group more than one dataset at the same time. In weather data, there is one table listing the location and other essentials of each weather station and a set of tables listing, for each hour, the weather at each station. Here’s one way to combine them into a new table, giving the explicit latitude and longitude of every observation:

------
G1=GROUP WSTNS BY (ID1,ID2), WOBS BY (ID1,ID2);
G2=FLATTEN G1…
G3=FOR EACH G2 …
------

------
parks = LOAD '.../parks.tsv' AS (...);
player_seasons = LOAD '.../player_seasons.tsv' AS (...);
team_seasons = LOAD '.../team_seasons.tsv' AS (...);

park_seasons = JOIN parks BY park_id, team_seasons BY park_id;
park_seasons = FOREACH park_seasons GENERATE
   team_seasons.team_id, team_seasons.year, parks.park_id, parks.name AS park_name;

player_seasons = FOREACH player_seasons GENERATE
   player_id, name AS player_name, year, team_id;
player_season_parks = JOIN
   parks           BY (year, team_id),
   player_seasons BY (year, team_id);
player_season_parks = FOREACH player_season_parks GENERATE player_id, player_name, parks::year AS year, parks::team_id AS team_id, parks::park_id AS park_id;

player_all_parks = GROUP player_season_parks BY (player_id);
describe player_all_parks;
Player_parks = FOREACH player_all_parks {
   player = FirstFromBag(players);
   home_parks = DISTINCT(parks.park_id);
   GENERATE group AS player_id,
       FLATTEN(player.name),
       MIN(players.year) AS beg_year, MAX(players.year) AS end_year,
       home_parks; -- TODO ensure this is still tuple-ized
}
------

Whoa! There are a few new tricks here.

We would like our output to have one row per player, whose fields have these different flavors:

* Aggregated fields (`beg_year`, `end_year`) come from functions that turn a bag into a simple type (`MIN`, `MAX`).
* The `player_id` is pulled from the `group` field, whose value applies uniformly to the the whole group by definition. Note that it's also in each tuple of the bagged `player_park_seasons`, but then you'd have to turn many repeated values into the one you want...
* ... which we have to do for uniform fields (like `name`) that are not part of the group key, but are the same for all elements of the bag. The awareness that those values are uniform comes from our understanding of the data -- Pig doesn't know that the name will always be the same. The FirstFromBag (TODO fix name) function from the Datafu package grabs just first one of those values
* Inline bag fields (`home_parks`), which continue to have multiple values.

We've applied the `DISTINCT` operation so that each home park for a player appears only once. `DISTINCT` is one of a few operations that can act as a top-level table operation, and can also act on bags within a foreach -- we'll pick this up again in the next chapter (TODO ref). For most people, the biggest barrier to mastery of Pig is to understand how the name and type of each field changes through restructuring operations, so let's walk through the schema evolution.

Nested FOREACH allows CROSS, DISTINCT, FILTER, FOREACH, LIMIT, and ORDER BY (as of Pig 0.12).

We `JOIN`ed player seasons and team seasons on `(year, team_id)`. The resulting schema has those fields twice. To select the name, we use two colons (the disambiguate operator): `players::year`.

After the `GROUP BY` operation, the schema is `group:int, player_season_parks:bag{tuple(player_id, player_name, year, team_id, park_id, park_name)}`. The schema of the new `group` field matches that of the `BY` clause: since `park_id` has type chararray, so does the group field. (If we had supplied multiple fields to the `BY` clause, the `group` field would have been of type `tuple`). The second field, `player_season_parks`, is a bag of size-6 tuples. Be clear about what the names mean here: grouping on the `player_season_parks` _table_ (whose schema has six fields) produced the `player_parks` table. The second field of the `player_parks` table is a tuple of size six (the six fields in the corresponding table) named `player_season_parks` (the name of the corresponding table).

So within the `FOREACH`, the expression `player_season_parks.park_id` is _also_ a bag of tuples (remember, bags only hold tuples!), now size-1 tuples holding only the park_id. That schema is preserved through the `DISTINCT` operation, so `home_parks` is also a bag of size-1 tuples.

------
   team_park_seasons = LOAD '/tmp/team_parks.tsv' AS (
       team_id:chararray,
       park_years: bag{tuple(year:int, park_id:chararray)},
       park_ids_lookup: map[chararray]
       );
   team_parks = FOREACH team_park_seasons { distinct_park_ids = DISTINCT park_years.park_id; GENERATE team_id, FLATTEN(distinct_park_ids) AS park_id; }
   DUMP team_parks;
------

TODO add flatten example that crosses the data.



=== Exercises

Distributions:

* First letter of Wikipedia article titles

* Count of inbound links for wikipedia articles

* Total sum of pageviews counts for each page