Skip to content
This repository has been archived by the owner on Mar 23, 2022. It is now read-only.

Latest commit

 

History

History
1054 lines (741 loc) · 41.7 KB

README.md

File metadata and controls

1054 lines (741 loc) · 41.7 KB

avro-hadoop-starter Build Status

Example MapReduce jobs in Java, Hadoop Streaming, Pig and Hive that read and/or write data in Avro format.


Table of Contents


Requirements

The examples require the following software versions:

  • Gradle 1.9 (only for the Java examples)
  • Java JDK 7 (only for the Java examples)
    • It is easy to switch to JDK 6. Mostly you will need to change the sourceCompatibility and targetCompatibility parameters in build.gradle from 1.7 to 1.6 But since there are a couple of JDK 7 related gotchas (e.g. problems with its new bytecode verifier) that the Java example code solves I decided to stick with JDK 7 as the default.
  • Hadoop 2.x with MRv1 (not MRv2/YARN)
  • Pig 0.11
  • Hive 0.10
  • Twitter Bijection 0.6
  • Avro 1.7.7

More precisely, the examples where tested with those Hadoop stack components that ship with Cloudera CDH 4.x.

Example data

We are using a small, Twitter-like data set as input for our example MapReduce jobs.

Avro schema

twitter.avsc defines a basic schema for storing tweets:

{
  "type" : "record",
  "name" : "Tweet",
  "namespace" : "com.miguno.avro",
  "fields" : [ {
    "name" : "username",
    "type" : "string",
    "doc"  : "Name of the user account on Twitter.com"
  }, {
    "name" : "tweet",
    "type" : "string",
    "doc"  : "The content of the user's Twitter message"
  }, {
    "name" : "timestamp",
    "type" : "long",
    "doc"  : "Unix epoch time in seconds"
  } ],
  "doc:" : "A basic schema for storing Twitter messages"
}

The latest version of the schema is always available at twitter.avsc.

If you want to generate Java classes from this Avro schema follow the instructions described in section Usage. Alternatively you can also use the Avro Compiler directly.

Avro data files

The actual data is stored in the following files:

  • twitter.avro -- encoded (serialized) version of the example data in binary Avro format, compressed with Snappy
  • twitter.json -- JSON representation of the same example data

You can convert back and forth between the two encodings (Avro vs. JSON) using Avro Tools. See Reading and Writing Avro Files From the Command Line for instructions on how to do that.

Here is a snippet of the example data:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }
{"username":"DarkTemplar","tweet":"From the shadows I come!","timestamp": 1366154681 }
{"username":"VoidRay","tweet":"Prismatic core online!","timestamp": 1366160000 }

Preparing the input data

The example input data we are using is twitter.avro. Upload twitter.avro to HDFS to make the input data available to our MapReduce jobs.

# Upload the input data
$ hadoop fs -mkdir examples/input
$ hadoop fs -copyFromLocal src/test/resources/avro/twitter.avro examples/input

We will also upload the Avro schema twitter.avsc to HDFS because we will use a schema available at an HDFS location in one of the Hive examples.

# Upload the Avro schema
$ hadoop fs -mkdir examples/schema
$ hadoop fs -copyFromLocal src/main/resources/avro/twitter.avsc examples/schema

Java

Usage

To prepare your Java IDE:

# IntelliJ IDEA
$ ./gradlew cleanIdea idea   # then File > Open... > avro-hadoop-starter.ipr

# Eclipse
$ ./gradlew cleanEclipse eclipse

To build the Java code and to compile the Avro-based Java classes from the schemas (*.avsc) in src/main/resources/avro/:

$ ./gradlew clean build

The generated Avro-based Java classes are written under the directory tree generated-sources/. The Avro compiler will generate a Java class Tweet from the twitter.avsc schema.

To run the unit tests (notably TweetCountTest, see section Examples below):

$ ./gradlew test

Note: ./gradlew test executes any JUnit unit tests. If you add any TestNG unit tests you need to run ./gradlew testng for executing those.

You can also run ./gradlew cobertura which will generate a test coverage report at ./build/reports/cobertura/coverage.xml that you can integrate into your CI setup.

Examples

TweetCount

TweetCount implements a MapReduce job that counts the number of tweets created by Twitter users.

TweetCount: Usage: TweetCount <input path> <output path>

TweetCountTest

TweetCountTest is very similar to TweetCount. It uses twitter.avro as its input and runs a unit test on it with the same MapReduce job as TweetCount. The unit test includes comparing the actual MapReduce output (in Snappy-compressed Avro format) with expected output. TweetCountTest extends ClusterMapReduceTestCase (MRv1), which means that the corresponding MapReduce job is launched in-memory via MiniMRCluster.

MiniMRCluster and Hadoop MRv2

The MiniMRCluster that is used by ClusterMapReduceTestCase in MRv1 is deprecated in Hadoop MRv2. When using MRv2 you should switch to MiniMRClientClusterFactory, which provides a wrapper interface called MiniMRClientCluster around the MiniMRYarnCluster (MRv2):

MiniMRClientClusterFactory: A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster interface around the MiniMRYarnCluster. While in MR1, it provides such wrapper around MiniMRCluster. This factory should be used in tests to provide an easy migration of tests across MR1 and MR2.

See Experimenting with MapReduce 2.0 for more information.

Avro: String vs CharSequence vs Utf8

One caveat when using Avro in Java (or Scala, ...) is that you may create a new Avro-backed object with a java.lang.String parameter (e.g. the username in the Avro schema we use in our examples), but as you convert your data record to binary and back to POJO you will observe that Avro actually gives you an instance of CharSequence instead of a String. Now the problem is that by default Avro generated Java classes expose CharSequence for string fields in their API but unfortunately you cannot use just any CharSequence when interacting with your data records -- such as java.lang.String, which does implement CharSequence. You must use Avro's own Utf8 instead. A typical case where you run into this gotcha is when your unit tests complain that doing a round-trip conversion of a data record does apparently not result in the original record.

One possible remedy to this problem is to instruct Avro to explicitly return an instance of String. This is usually what you want as it provides you with the intuitive behavior that you'd typically expect. Your mileage may vary though.

For details see AVRO-803: Java generated Avro classes make using Avro painful and surprising.

Enforce use of String when using sbt

Add (stringType in avroConfig) := "String" to your build.sbt, assuming you use cavorite's sbt-avro plugin:

Real-world build.sbt example:

seq(sbtavro.SbtAvro.avroSettings : _*)

// Configure the desired Avro version.  sbt-avro automatically injects a libraryDependency.
(version in avroConfig) := "1.7.7"

// Look for *.avsc etc. files in src/test/avro
(sourceDirectory in avroConfig) <<= (sourceDirectory in Compile)(_ / "avro")

(stringType in avroConfig) := "String"

Enforce use of String when using gradle

Add the following to your build.gradle, assuming you use my avro-gradle-plugin:

compileAvro {
  stringType = 'String'
}

Enforce use of String when using maven

Add <stringType>String</stringType> to the configuration of avro-maven-plugin in your pom.xml.

Real-world pom.xml example:

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.7.7</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
        <stringType>String</stringType>
      </configuration>
    </execution>
  </executions>
</plugin>

Further readings on Java

Hadoop Streaming

Preliminaries

Important: The examples below assume you have access to a running Hadoop cluster.

How Streaming sees data when reading via AvroAsTextInputFormat

When using AvroAsTextInputFormat as the input format your streaming code will receive the data in JSON format, one record ("datum" in Avro parlance) per line. Note that Avro will also add a trailing TAB (\t) at the end of each line.

<JSON representation of Avro record #1>\t
<JSON representation of Avro record #2>\t
<JSON representation of Avro record #3>\t
...

Here is the basic data flow from your input data in binary Avro format to our streaming mapper:

input.avro (binary)  ---AvroAsTextInputFormat---> deserialized data (JSON) ---> Mapper

Examples

Prerequisites

The example commands below use the Hadoop Streaming jar for MRv1 shipped with Cloudera CDH4:

If you are not using Cloudera CDH4 or are using a new version of CDH4 just replace the jar file with the one included in your Hadoop installation.

The Avro jar files are straight from the Avro project:

Reading Avro, writing plain-text

The following command reads Avro data from the relative HDFS directory examples/input/ (which normally resolves to /user/<your-unix-username>/examples/input/). It writes the deserialized version of each data record (see section How Streaming sees data when reading via AvroAsTextInputFormat above) as is to the output HDFS directory streaming/output/. For this simple demonstration we are using the IdentityMapper as a naive map step implementation -- it outputs its input data unmodified (equivalently we coud use the Unix tool cat, here) . We do not need to run a reduce phase here, which is why we disable the reduce step via the option -D mapred.reduce.tasks=0 (see Specifying Map-Only Jobs in the Hadoop Streaming documentation).

# Run the streaming job
$ hadoop jar hadoop-streaming-2.0.0-mr1-cdh4.3.0.jar \
    -D mapred.job.name="avro-streaming" \
    -D mapred.reduce.tasks=0 \
    -files avro-1.7.7.jar,avro-mapred-1.7.7-hadoop1.jar \
    -libjars avro-1.7.7.jar,avro-mapred-1.7.7-hadoop1.jar \
    -input  examples/input/ \
    -output streaming/output/ \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -inputformat org.apache.avro.mapred.AvroAsTextInputFormat

Once the job completes you can inspect the output data as follows:

$ hadoop fs -cat streaming/output/part-00000 | head -4
{"username": "miguno", "tweet": "Rock: Nerf paper, scissors is fine.", "timestamp": 1366150681}
{"username": "BlizzardCS", "tweet": "Works as intended.  Terran is IMBA.", "timestamp": 1366154481}
{"username": "DarkTemplar", "tweet": "From the shadows I come!", "timestamp": 1366154681}
{"username": "VoidRay", "tweet": "Prismatic core online!", "timestamp": 1366160000}

Please be aware that the output data just happens to be JSON. This is because we opted not to modify any of the input data in our MapReduce job. And since the input data to our MapReduce job is deserialized by Avro into JSON, the output turns out to be JSON, too. With a different MapReduce job you could of course write the output data in TSV or CSV format, for instance.

Reading Avro, writing Avro

AvroTextOutputFormat (implies "bytes" schema)

To write the output in Avro format instead of plain-text, use the same general options as in the previous example but also add:

$ hadoop jar hadoop-streaming-2.0.0-mr1-cdh4.3.0.jar \
    [...]
    -outputformat org.apache.avro.mapred.AvroTextOutputFormat

AvroTextOutputFormat is the equivalent of TextOutputFormat. It writes Avro data files with a "bytes" schema.

Note that using IdentityMapper as a naive mapper as shown in the previous example will not result in the output file being identical to the input file. This is because AvroTextOutputFormat will escape (quote) the input data it receives. An illustration might be worth a thousand words:

# After having used IdentityMapper as in the previous example
$ hadoop fs -copyToLocal streaming/output/part-00000.avro .

$ java -jar avro-tools-1.7.7.jar tojson part-00000.avro  | head -4
"{\"username\": \"miguno\", \"tweet\": \"Rock: Nerf paper, scissors is fine.\", \"timestamp\": 1366150681}\t"
"{\"username\": \"BlizzardCS\", \"tweet\": \"Works as intended.  Terran is IMBA.\", \"timestamp\": 1366154481}\t"
"{\"username\": \"DarkTemplar\", \"tweet\": \"From the shadows I come!\", \"timestamp\": 1366154681}\t"
"{\"username\": \"VoidRay\", \"tweet\": \"Prismatic core online!\", \"timestamp\": 1366160000}\t"

Custom Avro output schema

This looks not to be supported by stock Avro at the moment. A related JIRA ticket AVRO-1067, created in April 2012, is still unresolved as of July 2013.

For a workaround take a look at the section Avro output for Hadoop Streaming at avro-utils, a third-party library for Avro.

Enabling compression of Avro output data (Snappy or Deflate)

If you want to enable compression for the Avro output data, you must add the following parameters to the streaming job:

# For compression with Snappy
-D mapred.output.compress=true -D avro.output.codec=snappy

# For compression with Deflate
-D mapred.output.compress=true -D avro.output.codec=deflate

Be aware that if you enable compression with mapred.output.compress but are NOT specifying an Avro output format (such as AvroTextOutputFormat) your cluster's configured default compression codec will determine the final format of the output data. For instance, if mapred.output.compression.codec is set to com.hadoop.compression.lzo.LzopCodec then the job's output files would be compressed with LZO (e.g. you would see part-00000.lzo output files instead of uncompressed part-00000 files).

See also Compression and Avro in the CDH4 documentation.

Further readings on Hadoop Streaming

Hive

Preliminaries

Important: The examples below assume you have access to a running Hadoop cluster.

Examples

In this section we demonstrate how to create a Hive table backed by Avro data, followed by running a few simple Hive queries against that data.

Defining a Hive table backed by Avro data

Using avro.schema.url to point to remote a Avro schema file

The following CREATE TABLE statement creates an external Hive table named tweets for storing Twitter messages in a very basic data structure that consists of username, content of the message and a timestamp.

For Hive version 0.11+:

Starting with Hive version 0.11 you must use SERDEPROPERTIES instead of WITH TBLPROPERTIES to specify the Avro schema. If you mistakingly use TBLPROPERTIES Hive will complain with a AvroSerdeException.

-- Use the following syntax for Hive 0.11+
--
CREATE EXTERNAL TABLE tweets
    COMMENT "A table backed by Avro data with the Avro schema stored in HDFS"
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    WITH SERDEPROPERTIES (
        'avro.schema.url' = 'hdfs:///user/YOURUSER/examples/schema/twitter.avsc'
    )
    STORED AS
    INPUTFORMAT  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION '/user/YOURUSER/examples/input/';

For Hive versions <= 0.10:

-- Use the following syntax for Hive versions <= 0.10
--
CREATE EXTERNAL TABLE tweets_deprecated
    COMMENT "A table backed by Avro data with the Avro schema stored in HDFS"
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    STORED AS
    INPUTFORMAT  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION '/user/YOURUSER/examples/input/'
    TBLPROPERTIES (
        'avro.schema.url' = 'hdfs:///user/YOURUSER/examples/schema/twitter.avsc'
    );

Important: Notice how WITH SERDEPROPERTIES is specified after SERDE and TBLPROPERTIES after LOCATION, respectively.

Note: You must replace YOURUSER with your actual username. See section [Preparing the input data](#Preparing the input data) above.

The serde parameter avro.schema.url can use URI schemes such as hdfs://, http:// and file://. It is recommended to use HDFS locations though:

[If the avro.schema.url points] to a location on HDFS [...], the AvroSerde will then read the file from HDFS, which should provide resiliency against many reads at once [which can be a problem for HTTP locations]. Note that the serde will read this file from every mapper, so it is a good idea to turn the replication of the schema file to a high value to provide good locality for the readers. The schema file itself should be relatively small, so this does not add a significant amount of overhead to the process.

That said, when hosting the schemas on a high-performance web server such as nginx that is very efficient at serving static files then using HTTP locations for Avro schemas should not be a problem either.

If you need to point to a particular HDFS namespace you can include the hostname and port of the NameNode in avro.schema.url:

CREATE EXTERNAL TABLE [...]
    WITH SERDEPROPERTIES (
        'avro.schema.url'='hdfs://namenode01:8020/path/to/twitter.avsc'
    )
    [...]

Note: Remember to use TBLPROPERTIES (after LOCATION) instead of WITH SERDEPROPERTIES (after SERDE) for Hive versions <= 0.10.

Using avro.schema.literal to embed an Avro schema

An alternative to setting avro.schema.url and using an external Avro schema is to embed the schema directly within the CREATE TABLE statement:

CREATE EXTERNAL TABLE tweets
    COMMENT "A table backed by Avro data with the Avro schema embedded in the CREATE TABLE statement"
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    WITH SERDEPROPERTIES (
        'avro.schema.literal' = '{
            "type": "record",
            "name": "Tweet",
            "namespace": "com.miguno.avro",
            "fields": [
                { "name":"username",  "type":"string"},
                { "name":"tweet",     "type":"string"},
                { "name":"timestamp", "type":"long"}
            ]
        }'
    )
    STORED AS
    INPUTFORMAT  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION '/user/YOURUSER/examples/input/';

Note: Remember to use TBLPROPERTIES (after LOCATION) instead of WITH SERDEPROPERTIES (after SERDE) for Hive versions <= 0.10.

Note: You must replace YOURUSER with your actual username. See section [Preparing the input data](#Preparing the input data) above.

Hive can also use variable substitution to embed the required Avro schema at run-time of a Hive script:

CREATE EXTERNAL TABLE tweets [...]
    WITH SERDEPROPERTIES ('avro.schema.literal' = '${hiveconf:schema}');

Note: Remember to use TBLPROPERTIES (after LOCATION) instead of WITH SERDEPROPERTIES (after SERDE) for Hive versions <= 0.10.

To execute the Hive script you would then run:

# SCHEMA must be a properly escaped version of the Avro schema; i.e. carriage returns converted to \n, tabs to \t,
# quotes escaped, and so on.
$ export SCHEMA="..."
$ hive -hiveconf schema="${SCHEMA}" -f hive_script.hql

Switching from avro.schema.url to avro.schema.literal or vice versa

If for a given Hive table you want to change how the Avro schema is specified you need to use a workaround:

Hive does not provide an easy way to unset or remove a property. If you wish to switch from using url or schema to the other, set the to-be-ignored value to none and the AvroSerde will treat it as if it were not set.

Analyzing the data with Hive

After you have created the Hive table tweets with one of the CREATE TABLE statements above (no matter which), you can start analyzing the example data with Hive. We will demonstrate this via the interactive Hive shell, but you can also use a Hive script, of course.

First, start the Hive shell:

$ hive
hive>

Let us inspect how Hive interprets the Avro data with DESCRIBE. You can also use DESCRIBE EXTENDED to see even more details, including the Avro schema of the table.

hive> DESCRIBE tweets;
OK
username        string  from deserializer
tweet   string  from deserializer
timestamp       bigint  from deserializer
Time taken: 1.786 seconds

Now we can perform interactive analysis of our example data:

hive> SELECT * FROM tweets LIMIT 5;
OK
miguno        Rock: Nerf paper, scissors is fine.   1366150681
BlizzardCS    Works as intended.  Terran is IMBA.   1366154481
DarkTemplar   From the shadows I come!              1366154681
VoidRay       Prismatic core online!                1366160000
VoidRay       Fire at will, commander.              1366160010
Time taken: 0.126 seconds

The following query will launch a MapReduce job to compute the result:

hive> SELECT DISTINCT(username) FROM tweets;
Total MapReduce jobs = 1
Launching Job 1 out of 1
[...snip...]
MapReduce Total cumulative CPU time: 4 seconds 290 msec
Ended Job = job_201305070634_0187
MapReduce Jobs Launched:
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 4.29 sec   HDFS Read: 1887 HDFS Write: 47 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 290 msec
OK
BlizzardCS          <<< Query results start here
DarkTemplar
Immortal
VoidRay
miguno
Time taken: 16.782 seconds

As you can see Hive makes working Avro data completely transparent once you have defined the Hive table accordingly.

Enabling compression of Avro output data

To enable compression add the following statements to your Hive script or enter them into the Hive shell:

# For compression with Snappy
SET hive.exec.compress.output=true;
SET avro.output.codec=snappy;

# For compression with Deflate
SET hive.exec.compress.output=true;
SET avro.output.codec=deflate;

To disable compression again in the same Hive script/Hive shell:

SET hive.exec.compress.output=false;

Hive and Hue

There are good and bad news:

  1. Good: You can readily browse Avro-backed Hive tables in Hue via a table's "Sample" tab.
  2. Bad: You cannot (yet) inspect the table metadata -- e.g. column names and types -- for Avro-backed Hive tables via a table's "Columns" tab. Hue will also display "No data available".

Browsing data of Avro Hive tables Figure 1: Browsing data of Avro Hive tables works as expected.

Displaying metadata of Avro Hive tables Figure 2: Displaying metadata of Avro Hive tables does not work yet.

Further readings on Hive

Pig

Preliminaries

Important: The examples below assume you have access to a running Hadoop cluster.

Examples

Prerequisites

First we must register the required jar files to be able to work with Avro. In this example I am using the jar files shipped with CDH4. If you are not using CDH4 just adapt the paths to match your Hadoop distribution.

REGISTER /app/cloudera/parcels/CDH/lib/pig/piggybank.jar
REGISTER /app/cloudera/parcels/CDH/lib/pig/lib/avro-*.jar
REGISTER /app/cloudera/parcels/CDH/lib/pig/lib/jackson-core-asl-*.jar
REGISTER /app/cloudera/parcels/CDH/lib/pig/lib/jackson-mapper-asl-*.jar
REGISTER /app/cloudera/parcels/CDH/lib/pig/lib/json-simple-*.jar
REGISTER /app/cloudera/parcels/CDH/lib/pig/lib/snappy-java-*.jar

Note: If you also want to work with Python UDFs in PiggyBank you must also register the Jython jar file:

REGISTER /app/cloudera/parcels/CDH/lib/pig/lib/jython-standalone-*.jar

Reading Avro

To read input data in Avro format you must use AvroStorage. The following statements show various ways to load Avro data.

-- Easiest case: when the input data contains an embedded Avro schema (our example input data does).
-- Note that all the files under the directory should have the same schema.
records = LOAD 'examples/input/' USING org.apache.pig.piggybank.storage.avro.AvroStorage();

--
-- Next commands show how to manually specify the data schema
--

-- Using external schema file (stored on HDFS), relative path
records = LOAD 'examples/input/'
          USING org.apache.pig.piggybank.storage.avro.AvroStorage('no_schema_check',
               'schema_file', 'examples/schema/twitter.avsc');

-- Using external schema file (stored on HDFS), absolute path
records = LOAD 'examples/input/'
          USING org.apache.pig.piggybank.storage.avro.AvroStorage(
            'no_schema_check',
            'schema_file', 'hdfs:///user/YOURUSERNAME/examples/schema/twitter.avsc');

-- Using external schema file (stored on HDFS), absolute path with explicit HDFS namespace
records = LOAD 'examples/input/'
          USING org.apache.pig.piggybank.storage.avro.AvroStorage(
            'no_schema_check',
            'schema_file', 'hdfs://namenode01:8020/user/YOURUSERNAME/examples/schema/twitter.avsc');

About "no_schema_check": AvroStorage assumes that all Avro files in sub-directories of an input directory share the same schema, and by default AvroStorage performs a schema check. This process may take some time (seconds) when the input directory contains many sub-directories and files. You can set the option "no_schema_check" to disable this schema check.

See AvroStorage and TestAvroStorage.java for further examples.

Analyzing the data with Pig

The records relation is already in perfectly usable format -- you do not need to manually define a (Pig) schema as you would usually do via LOAD ... AS (...schema follows...).

grunt> DESCRIBE records;
records: {username: chararray,tweet: chararray,timestamp: long}

Let us take a first look at the contents of the our input data. Note that the output you will see will vary at each invocation due to how ILLUSTRATE works.

grunt> ILLUSTRATE records;
<snip>
--------------------------------------------------------------------------------------------
| records     | username:chararray      | tweet:chararray            | timestamp:long      |
--------------------------------------------------------------------------------------------
|             | DarkTemplar             | I strike from the shadows! | 1366184681          |
--------------------------------------------------------------------------------------------

Now we can perform interactive analysis of our example data:

grunt> first_five_records = LIMIT records 5;
grunt> DUMP first_five_records;   <<< this will trigger a MapReduce job
[...snip...]
(miguno,Rock: Nerf paper, scissors is fine.,1366150681)
(VoidRay,Prismatic core online!,1366160000)
(VoidRay,Fire at will, commander.,1366160010)
(BlizzardCS,Works as intended.  Terran is IMBA.,1366154481)
(DarkTemplar,From the shadows I come!,1366154681)

List the (unique) names of users that created tweets:

grunt> usernames = DISTINCT (FOREACH records GENERATE username);
grunt> DUMP usernames;            <<< this will trigger a MapReduce job
[...snip...]
(miguno)
(VoidRay)
(Immortal)
(BlizzardCS)
(DarkTemplar)

Writing Avro

To write output data in Avro format you must use AvroStorage -- just like for reading Avro data.

It is strongly recommended that you do specify an explicit output schema when writing Avro data. If you don't then Pig will try to infer the output Avro schema from the data's Pig schema -- and this may result in undesirable schemas due to discrepancies of Pig and Avro data models (or problems of Pig itself). See AvroStorage for details.

-- Use the same output schema as an existing directory of Avro files (files should have the same schema).
-- This is helpful, for instance, when doing simple processing such as filtering the input data without modifying
-- the resulting data layout.
STORE records INTO 'pig/output/'
    USING org.apache.pig.piggybank.storage.avro.AvroStorage(
        'no_schema_check',
        'data', 'examples/input/');

-- Use the same output schema as an existing Avro file as opposed to a directory of such files
STORE records INTO 'pig/output/'
    USING org.apache.pig.piggybank.storage.avro.AvroStorage(
        'no_schema_check',
        'data', 'examples/input/twitter.avro');

-- Manually define an Avro schema (here, we rename 'username' to 'user' and 'tweet' to 'message')
STORE records INTO 'pig/output/'
    USING org.apache.pig.piggybank.storage.avro.AvroStorage(
        '{
            "schema": {
                "type": "record",
                "name": "Tweet",
                "namespace": "com.miguno.avro",
                "fields": [
                    {
                        "name": "user",
                        "type": "string"
                    },
                    {
                        "name": "message",
                        "type": "string"
                    },
                    {
                        "name": "timestamp",
                        "type": "long"
                    }
                ],
                "doc:" : "A slightly modified schema for storing Twitter messages"
            }
        }');

If you need to store the data in two or more different ways (e.g. you want to rename fields) you must add the parameter "index" to the AvroStorage arguments. Pig uses this information as a workaround to distinguish schemas specified by different AvroStorage calls until Pig's StoreFunc provides access to Pig's output schema in the backend.

STORE records INTO 'pig/output-variant-A/'
    USING org.apache.pig.piggybank.storage.avro.AvroStorage(
        '{
            "index": 1,
            "schema": { ... }
        }');

STORE records INTO 'pig/output-variant-B/'
    USING org.apache.pig.piggybank.storage.avro.AvroStorage(
        '{
            "index": 2,
            "schema": { ... }
        }');

See AvroStorage and TestAvroStorage.java for further examples.

TODO: Show how to store the usernames relation

Note: This example is not working yet.

To store the usernames relation from the Reading Avro section above:

-- TODO: WHY DOES THIS STATEMENT FAIL DURING MAPREDUCE RUNTIME WITH
--          java.io.IOException: org.apache.avro.file.DataFileWriter$AppendWriteException:
--              java.lang.RuntimeException: Unsupported type in record:class java.lang.String
--
STORE usernames INTO 'pig/output/'
    USING org.apache.pig.piggybank.storage.avro.AvroStorage(
        '{
            "index": 1,
            "schema": {
                "type":"record",
                "name":"User",
                "namespace": "com.miguno.avro",
                "fields": [
                    {"name":"username","type":"string"}
                ]
            }
        }');


-- TODO: THIS STATEMENT FAILS, TOO, WITH THE SAME RUNTIME EXCEPTION
--
STORE usernames INTO 'pig/output/'
    USING org.apache.pig.piggybank.storage.avro.AvroStorage(
        '{
            "schema_file": "examples/schema/user.avsc",
            "field0": "def:username"
        }');

Enabling compression of Avro output data

To enable compression add the following statements to your Pig script or enter them into the Pig Grunt shell:

-- We also enable compression of map output (which should be enabled by default anyways) because some Pig jobs
-- skip the reduce phase;  this ensures that we always generate compressed job output.
SET mapred.compress.map.output true;
SET mapred.output.compress true;
SET mapred.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec
SET avro.output.codec snappy;

To disable compression again in the same Pig script/Pig Grunt shell:

SET mapred.output.compress false;
-- Optionally: disable compression of map output (normally you want to leave this enabled)
SET mapred.compress.map.output false;

Further readings on Pig

Twitter Bijection

Bijection is a very nifty library to convert between different kinds of data formats including Avro. The examples below use Scala but of course you can also use any other JVM language (including good old Java) to work with Bijection.

Examples

The following Scala example assumes that you have an Avro-backed instance of Tweet (see twitter.avsc) that you want to convert to, say, an array of bytes and back.

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.SpecificAvroCodecs
import com.miguno.avro.Tweet // Your Avro-generated Java class, based on twitter.avsc

val tweet = new Tweet("miguno", "Terran is the cheese race.", 1366190000)

// From POJO to byte array
val bytes = Injection[Tweet, Array[Byte]](tweet)

// From byte array back to POJO
val recovered_tweet = Injection.invert[Tweet, Array[Byte]](bytes)

Related documentation

Contributing to avro-hadoop-starter

Code contributions, bug reports, feature requests etc. are all welcome.

If you are new to GitHub please read Contributing to a project for how to send patches and pull requests to avro-hadoop-starter.

License

Copyright © 2013-2014 Michael G. Noll

See LICENSE for licensing information.

githalytics.com alpha