Skip to content

Latest commit

 

History

History
273 lines (158 loc) · 19.6 KB

XX52-advanced_pig.asciidoc

File metadata and controls

273 lines (158 loc) · 19.6 KB

Advanced Pig

Optimizing Hadoop Dataflows

The guidelines in this section start by repeating things your good common sense surely already knew — you didn’t buy this book to have people tell you it’s inefficient to process more data than you need to. It’s not always obvious, however, how to get Hadoop to respect your good common sense and, if anything that follows seems obvious to you, well, it’s included here because your authors have most likely learned it the hard way.

So, don’t process more data than you have to. In Pig, put filters before joins or other structural operations. If you’re doing a filter, you probably have a good sense of how much data should be allowed through. Check the Job Tracker Console and if the ratio of data read to Mapper output data size does not line up with what you expect, dig deeper. Also, project out fields you don’t need for downstream operations; surprisingly, Pig does not do this for you. Keep in mind that when you do a Group, the key fields appear in full in both the new Group field and in every one of the grouped tuples. It is a good habit to follow every Group with a FOREACH.

If you only need a small fraction of records from a dataset, extract them as early as possible. If it is all the same to you, a LIMIT operation (taking the first N rows) is more efficient than a SAMPLE or FILTER; if you can get everything you need from one or a few input files, that’s even more efficient. As we harped on throughout the book, when you’re developing a job, it’s almost always worth it to start by extracting a faithful, smaller sub-universe to work with.

Process more data if you have to if it makes your code more readable. There is some benefit to having uniform schema, especially if you’re working with TSV files, where the Mapping from slot order to name is not intrinsic. If leaving in a couple extraneous fields would add five minutes to a job’s production runtime but subtract five lines from the source, we prefer the inefficient script.

In general, wasting CPU to save network or disk bandwidth is a good idea. If you grew up watching a 386 try to produce a ZIP file, it probably seems counterintuitive that storing your data in compressed files not only saves disk space but also speeds up processing time. However, most Hadoop jobs are overwhelmingly throughput bound, so spending the extra CPU cycles to compress and decompress data is more than justified by the overall efficiency of streaming less data off the disk or network. The section on Hadoop Internals (TODO: REF) explains how to enable compression of data between Mapper and Reducer (which you should always do) and how to read or write compressed data (which has tradeoffs you need to understand). In the case of intermediate checkpoints within a multi-stage workflow, it almost always makes sense to use a light compression format such as LZO or Snappy. In Pig, if you set the pig.tmpfilecompression and pig.tmpfilecompression.codec configuration variables appropriately, it will do that for you.

There are a few other cases where you should invoke this rule. If you have a large or variable text string that you only need to compare for uniqueness — e.g., using URLs as keys — it is worth using a string digest function to shorten its length, as described in the chapter on Sketch Functions (TODO: REF).

Regular expressions are much faster than you’d think, especially in Ruby. If you only need a small part of a string and it does not cost you in readability, it might be worth slicing out only the interesting part.

Use types efficiently. Always use a schema in Pig; no exceptions. In case you’re wondering, it makes your script more efficient and catches errors early, but most of all, it shows respect to your colleagues or future self.

There are a couple Pig-specific tradeoffs to highlight. Wherever possible, make your UDFs algebraic or at least Accumulator-like — as described in the section on UDFs (TODO: REF). If you use two FOREACH statements consecutively, Pig is often able to merge them, but not always. If you see Pig introduce an extra Mapside-only job where you didn’t think one was necessary, it has probably failed to combine them. Always start with the more readable code, then decide if the problem is worth solving. Most importantly, be aware of Pig’s specialized JOINs; these are important enough that they get their whole section below.

As you’ve seen, Pig is extremely sugar-free; more or less every structural operation corresponds to a unique Map/Reduce plan. In principle, a JOIN is simply a Cogroup with a FLATTEN and a DISTINCT is a Cogroup with a projection of just the GROUP key. Pig offers those more specific Operators because it is able to do them more efficiently. Watch for cases where you have unwittingly spelled these out explicitly.

Always remove records with a NULL Group key before the JOIN; those records will never appear in the output of a JOIN statement, yet they are not eliminated until after they have been sent across the network. Even worse, since all these records share all the same (worthless) key, they are all sent to the same Reducer, almost certainly causing a hotspot.

If you are processing a lot of small files, Pig offers the ability to process many at once per Mapper, an extremely important optimization. Set the pig.splitCombination and pig.maxCombinedSplitSize options; if you’re writing a custom loader, spend the extra effort to make it compatible with this optimization.

Do not use less or more parallelism than reasonable. We have talked repeatedly throughout the book about the dangers of hotspots — a few Reducers laboring to swallow the bulk of the dataset while its many comrades clock out early. Sometimes, however, your job’s configuration might unwittingly recommend to Hadoop that it only use one or a too-few number of Reducers. In this case, the Job Tracker would show only a few heavyweight Reduce tasks running, the other Reduce slots are sitting idle because nothing has been asked of them. Set the number of Reducers in Pig using the ‘PARALLEL’ directive, and in Wukong, using the ‘--REDUCE_TASKS=N’ (TODO: check spelling).

It can also be wasteful to have too many Reducers. If your job has many Reducers uniformly processing only a few kb of records, the large fixed costs of launching and accounting for those attempts justify using the parallelism settings to limit the number of Reducers.

Pig UDFs (User-Defined Functions)

LoadFunc / StoreFunc : Wonderdog — an ElasticSearch UDF

placeholder

Algebraic UDFs let Pig go fast

One of the great things

Geographic Merge JOIN

I think we will want a specialized merge join for the geo-gridded data. So that will go here I think.

Efficient JOINs in Pig

Pig has a number of specialized JOINs that, used in their appropriate circumstances, bring enough performance improvements to organize a cult around. (TODO: make funny).

Specialized Pig Join #1: The REPLICATED JOIN

If you are joining a large dataset with a small-enough one, Pig can often execute the operation using a Mapper-Only job, eliminating the costly startup time and network transfer of a Reduce step. This is commonplace enough and the performance impact large enough that it is always worth considering whether this type of JOIN is applicable.

Imagine visiting the opera while the United Nations is in town. The smart theater owner will prepare librettos in, say, a dozen languages, enough to delight the many thousands of attendees. A bad way to distribute them would be to arrange kiosks, one per language, throughout the lobby. Every aficionado would first have to fight their way through the crowd to find the appropriate kiosk, then navigate across the theater to find their seats. Our theater owner, being smart, instead handles the distribution as follows: Ushers stand at every entrance, armed with stacks of librettos; at every entrance, all the languages are represented. This means that, as each attendee files in, they simply select the appropriate one from what is on hand, then head to their seat without delay.

A Mapper-Only JOIN works analogously. Every Mapper reads the small dataset into a lookup table — a hash map keyed by the JOIN key (this is why you will also see it referred to as a HashMap JOIN). Every Mapper loads the contents of the smaller dataset in full into its own local lookup table (which is why it is also known as a Replicated JOIN). The minor cost of replicating that dataset to every single Mapper is often a huge improvement in processing speed by eliminating the entire Reduce stage. The constraint, however, is that the smaller dataset must fit entirely in RAM. The usher’s task is manageable when there is one type of libretto for each of a dozen languages but would be unmanageable if there were one type of libretto for each of several thousand home towns.

How much is too much? Watch for excessive GC activity. (TODO: Pig probably has a warning too - find out what it is). Within the limits of available RAM, you can use fewer Mappers with more available RAM; the Hadoop tuning chapter (TODO: REF) shows you how. Don’t be too aggressive, though; datasets have a habit of growing over time and you would hate to spend Thanksgiving day reworking the jobs that process retail sales data because you realized they would not stand up to the Black Friday rush.

There is a general principle here: It is obvious there is a class of problems which only crop up past a certain threshold of data. What may not be obvious, until you’ve learned it the hard way, is that the external circumstances most likely to produce that flood of extra data are also the circumstances that leave you least able to address the problem.

Specialized Pig Join #2: The MERGE JOIN

A JOIN of two datasets, each in strict total order by the JOIN key, can also be done using Mapper-Only by simply doing a modified Merge sort. You must ensure not only that the files are in sort order but that the lexicographic order of the file names match the order in which its parts should be read. If you do so, Pig can proceed as follows: It does a first pass to sample each file from the right-hand dataset to learn the distribution of keys throughout the files. The second stage performs the actual JOIN. Each Mapper reads from two streams: its assigned split within the left-hand dataset and the appropriate sections of the right-hand dataset. The Mapper’s job is then very simple; it grabs a group of records from the right-hand stream and a group of records from the left-hand stream and compares their keys. If they match, they are joined. If they do not match, it reads from the stream with the too-low key until it either produces the matching group or sails past it, in which case it similarly reads from the other stream.

As we’ve discussed a few times, reading data in straight streams like this lets the underlying system supply data at the fastest possible rate. What’s more, the first pass indexing scheme means most tasks will be “Map-local” — run on a machine whose data node hosts a copy of that block. In all, you require a short Mapper-Only task to sample the right-hand dataset and the network throughput cost that is ‘O(N)’ in the size of the second dataset. The constraint is, of course, that this only works with total-ordered data on the same key. For a “Gold” dataset — one that you expect to use as source data for a number of future jobs — we typically spend the time to do a last pass total sort of the dataset against the most likely JOIN key. It is a nice convenience for future users of the dataset, helps in sanity checking and improves the odds that you will be able to use the more efficient MERGE/JOIN.

Advanced Join Fu

Pig has three special-purpose join strategies: the "map-side" (aka 'fragment replicate') join

The map-side join have strong restrictions on the properties

A dataflow designed to take advantage of them can produce order-of-magnitude scalability improvements.

They’re also a great illustration of three key scalability patterns. Once you have a clear picture of how these joins work, you can be confident you understand the map/reduce paradigm deeply.

Map-side Join

A map-side (aka 'fragment replicate') join

In a normal JOIN, the largest dataset goes on the right. In a fragement-replicate join, the largest dataset goes on the left, and everything to the right must be tiny.

The Pig manual calls this a "fragment replicate" join, because that is how Pig thinks about it: the tiny datasets are duplicated to each machine. Throughout the book, I’ll refer to it as a map-side join, because that’s how you should think about it when you’re using it. The other common name for it is a Hash join — and if you want to think about what’s going on inside it, that’s the name you should use.

How a Map-side (Hash) join works =====

If you’ve been to enough large conferences you’ve seen at least one registration-day debacle. Everyone leaves their hotel to wait in a long line at the convention center, where they have set up different queues for some fine-grained partition of attendees by last name and conference track. Registration is a direct join of the set of attendees on the set of badges; those check-in debacles are basically the stuck reducer problem come to life.

If it’s a really large conference, the organizers will instead set up registration desks at each hotel. Now you don’t have to move very far, and you can wait with your friends. As attendees stream past the registration desk, the 'A-E' volunteer decorates the Arazolos and Eliotts with badges, the 'F-K' volunteer decorates the Gaspers and Kellys, and so forth. Note these important differences: a) the registration center was duplicated in full to each site b) you didn’t have to partition the attendees; Arazolos and Kellys and Zarebas can all use the same registration line.

To do a map-side join, Pig holds the tiny table in a Hash (aka Hashmap or dictionary), indexed by the full join key.

    .-------------.      |
    | tiny table  |      |    ... huge table ...
    +--+----------+      |
    |A | ...a...  |      | Q | ...
    |  | ...a...  |      | B | ...
    |Q | ...q...  |      | B | ...
    |F | ...f...  |      | B | ...
      ...                | A |  ...
    |Z | ...z...  |      | B | ...
    |  | ...z...  |      | B | ...
    |P | ...p...  |      | C | ...
    |_____________|      | Z | ...
                         | A | ...

As each row in the huge table flys by, it is decorated with the matching rows from the tiny table and emitted. Holding the data fully in-memory in a hash table gives you constant-time lookup speed for each key, and lets you access rows at the speed of RAM.

One map-side only pass through the data is enough to do the join.

See distribution of weather measurements for an example.

Example: map-side join of wikipedia page metadata with wikipedia pageview stats =====

Merge Join

How a merge join works =====

(explanation)

Quoting Pig docs:

You will also see better performance if the data in the left table is partitioned evenly across part files (no significant skew and each part file contains at least one full block of data).

Example: merge join of user graph with page rank iteration

Skew Join

(explanation of when needed)

How a skew join works

(explanation how)

Example: ? counting triangles in wikipedia page graph ? OR ? Pageview counts ?

TBD

Efficiency and Scalability

Do’s and Don’ts

The Pig Documentation has a comprehensive section on Performance and Efficiency in Pig. We won’t try to improve on it, but here are some highlights:

  • As early as possible, reduce the size of your data:

    • LIMIT

    • Use a FOREACH to reject unnecessary columns

    • FILTER

  • Filter out `Null`s before a join in a join, all the records rendezvous at the reducer if you reject nulls at the map side, you will reduce network load

Join Optimizations

"Make sure the table with the largest number of tuples per key is the last table in your query. In some of our tests we saw 10x performance improvement as the result of this optimization.

small = load 'small_file' as (t, u, v);
large = load 'large_file' as (x, y, z);
 C = join small by t, large by x;

(explain why)

(come up with a clever mnemonic that doesn’t involve sex, or get permission to use the mnemonic that does.)

Magic Combiners

TBD

Turn off Optimizations

After you’ve been using Pig for a while, you might enjoy learning about all those wonderful optimizations, but it’s rarely necessary to think about them.

In rare cases, you may suspect that the optimizer is working against you or affecting results.

To turn off an optimization

TODO: instructions

From the Pig Documentation: When Pig Pushes Filters

From the Pig Documentation: When Pig Pushes Filters

Table 8-1. When Pig pushes filters Preceding operator Filter will be pushed before? Comments cogroup Sometimes The filter will be pushed if it applies to only one input of the cogroup and does not contain a UDF. cross Sometimes The filter will be pushed if it applies to only one input of the cross. distinct Yes filter No Will seek to merge them with and to avoid passing data through a second operator. This is done only after all filter pushing is complete. foreach Sometimes The filter will be pushed if it references only fields that exist before and after the foreach, and foreach does not transform those fields. group Sometimes The filter will be pushed if it does not contain a UDF. join Sometimes The filter will be pushed if it applies to only one input of the join, and if the join is not outer for that input. load No mapreduce No mapreduce is opaque to Pig, so it cannot know whether pushing will be safe. sort Yes split No store No stream No stream is opaque to Pig, so it cannot know whether pushing will be safe. union Yes Also, consider adding filters that are implicit in your script. For example, all of the records with null values in the key will be thrown out by an inner join. If you know that more than a few hundred of your records have null key values, put a filter input by key is not null before the join. This will enhance the performance of your join.

Exercises

  1. Quoting Pig docs: > "You will also see better performance if the data in the left table is partitioned evenly across part files (no significant skew and each part file contains at least one full block of data)."

    Why is this?
  2. Each of the following snippets goes against the Pig documentation’s recommendations in one clear way.

    • Rewrite it according to best practices

    • compare the run time of your improved script against the bad version shown here.

      things like this from http://pig.apache.org/docs/r0.9.2/perf.html --
      1. (fails to use a map-side join)

      2. (join large on small, when it should join small on large)

      3. (many FOREACH`es instead of one expanded-form `FOREACH)

      4. (expensive operation before LIMIT)

For each use weather data on weather stations.

Pig and HBase

TBD

Pig and JSON

TBD