From 42506925289fc3b395f76b49b96b7965c01f3b9c Mon Sep 17 00:00:00 2001 From: Travis Dempsey Date: Mon, 9 Apr 2012 11:06:16 -0500 Subject: [PATCH 01/12] updated and cleaned up tests --- test/test_dump.pig | 25 +++++++++++-------------- test/test_json_loader.pig | 25 +++++++++++-------------- test/test_tsv_loader.pig | 24 ++++++++++-------------- 3 files changed, 32 insertions(+), 42 deletions(-) diff --git a/test/test_dump.pig b/test/test_dump.pig index fd1acaa..38f55e6 100644 --- a/test/test_dump.pig +++ b/test/test_dump.pig @@ -1,22 +1,19 @@ -- -- This tests loading data from elasticsearch -- -register '/usr/local/share/elasticsearch/lib/elasticsearch-0.16.0.jar'; -register '/usr/local/share/elasticsearch/lib/jline-0.9.94.jar'; -register '/usr/local/share/elasticsearch/lib/jna-3.2.7.jar'; -register '/usr/local/share/elasticsearch/lib/log4j-1.2.15.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-analyzers-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-core-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-highlighter-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-memory-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-queries-3.1.0.jar'; -register target/wonderdog-1.0-SNAPSHOT.jar; - -%default INDEX 'foo_test' -%default OBJ 'foo' + +%default ES_JAR_DIR '/usr/local/Cellar/elasticsearch/0.18.7/libexec' +%default ES_YAML '/usr/local/Cellar/elasticsearch/0.18.7/config/elasticsearch.yml' +%default PLUGINS '/usr/local/Cellar/elasticsearch/0.18.7/plugins' + +%default INDEX 'foo_test' +%default OBJ 'foo' + +register $ES_JAR_DIR/*.jar; +register target/wonderdog*.jar; -- -- Will load the data as (doc_id, contents) tuples where the contents is the original json source from elasticsearch -- -foo = LOAD 'es://foo_test/foo?q=character:c' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage() AS (doc_id:chararray, contents:chararray); +foo = LOAD 'es://$INDEX/$OBJ' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage('$ES_YAML', '$PLUGINS') AS (doc_id:chararray, contents:chararray); DUMP foo; diff --git a/test/test_json_loader.pig b/test/test_json_loader.pig index 70f076c..0c9d628 100644 --- a/test/test_json_loader.pig +++ b/test/test_json_loader.pig @@ -1,19 +1,16 @@ -- -- This tests the json indexer. Run in local mode with 'pig -x local test/test_json_loader.pig' -- -register '/usr/local/share/elasticsearch/lib/elasticsearch-0.16.0.jar'; -register '/usr/local/share/elasticsearch/lib/jline-0.9.94.jar'; -register '/usr/local/share/elasticsearch/lib/jna-3.2.7.jar'; -register '/usr/local/share/elasticsearch/lib/log4j-1.2.15.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-analyzers-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-core-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-highlighter-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-memory-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-queries-3.1.0.jar'; -register target/wonderdog-1.0-SNAPSHOT.jar; - -%default INDEX 'foo_test' -%default OBJ 'foo' + +%default ES_JAR_DIR '/usr/local/Cellar/elasticsearch/0.18.7/libexec' +%default ES_YAML '/usr/local/Cellar/elasticsearch/0.18.7/config/elasticsearch.yml' +%default PLUGINS '/usr/local/Cellar/elasticsearch/0.18.7/plugins' + +%default INDEX 'foo_test' +%default OBJ 'foo' + +register $ES_JAR_DIR/*.jar; +register target/wonderdog*.jar; foo = LOAD 'test/foo.json' AS (data:chararray); @@ -21,4 +18,4 @@ foo = LOAD 'test/foo.json' AS (data:chararray); -- Query parameters let elasticsearch output format that we're storing json data and -- want to use a bulk request size of 1 record. -- -STORE foo INTO 'es://$INDEX/$OBJ?json=true&size=1' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); +STORE foo INTO 'es://$INDEX/$OBJ?json=true&size=1' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage('$ES_YAML', '$PLUGINS'); diff --git a/test/test_tsv_loader.pig b/test/test_tsv_loader.pig index 7685298..a91f3e1 100644 --- a/test/test_tsv_loader.pig +++ b/test/test_tsv_loader.pig @@ -1,20 +1,16 @@ -- -- This tests the tsv indexer. Run in local mode with 'pig -x local test/test_tsv_loader.pig' -- -register '/usr/local/share/elasticsearch/lib/elasticsearch-0.16.0.jar'; -register '/usr/local/share/elasticsearch/lib/jline-0.9.94.jar'; -register '/usr/local/share/elasticsearch/lib/jna-3.2.7.jar'; -register '/usr/local/share/elasticsearch/lib/log4j-1.2.15.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-analyzers-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-core-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-highlighter-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-memory-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-queries-3.1.0.jar'; -register target/wonderdog-1.0-SNAPSHOT.jar; - -%default INDEX 'foo_test' -%default OBJ 'foo' +%default ES_JAR_DIR '/usr/local/Cellar/elasticsearch/0.18.7/libexec' +%default ES_YAML '/usr/local/Cellar/elasticsearch/0.18.7/config/elasticsearch.yml' +%default PLUGINS '/usr/local/Cellar/elasticsearch/0.18.7/plugins' + +%default INDEX 'foo_test' +%default OBJ 'foo' + +register $ES_JAR_DIR/*.jar; +register target/wonderdog*.jar; foo = LOAD 'test/foo.tsv' AS (character:chararray, value:int); -STORE foo INTO 'es://$INDEX/$OBJ?json=false&size=1' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); +STORE foo INTO 'es://$INDEX/$OBJ?json=false&size=1' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage('$ES_YAML', '$PLUGINS'); From ca553b9550b9071c461ca88a79a971c899eeff7c Mon Sep 17 00:00:00 2001 From: Travis Dempsey Date: Mon, 9 Apr 2012 11:43:37 -0500 Subject: [PATCH 02/12] added pig logs to gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 83feaa7..57eccc2 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,8 @@ a.out .tasks-cache .yardoc +*.log + *private* /log/* /pkg/* From 946219dac16c28b90f905c81a148b21e543efd2c Mon Sep 17 00:00:00 2001 From: Travis Dempsey Date: Mon, 9 Apr 2012 11:44:12 -0500 Subject: [PATCH 03/12] cleaned up documentation, round 1 --- CHANGELOG.md | 5 ++ README.md | 175 ++++++++++++++++++++++++++++++++++++++ README.textile | 225 ------------------------------------------------- 3 files changed, 180 insertions(+), 225 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 README.md delete mode 100644 README.textile diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f818640 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +Wonderdog has been updated to work with Elasticsearch v17 or greater. Previous code was moved into the branch v17_pre and will be deprecated. All changes going forward will be based upon compatibility with Elasticsearch v17 or greater and will live in the master branch. + +* estool has been redone completely for enhanced usability. Run ```bin/estool --help``` for examples. + +* Updated and cleaned up test directory. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a0aabd5 --- /dev/null +++ b/README.md @@ -0,0 +1,175 @@ +# Wonderdog + +Wonderdog is a Hadoop interface to Elastic Search. While it is specifically intended for use with Apache Pig, it does include all the necessary Hadoop input and output formats for Elastic Search. That is, it's possible to skip Pig en +tirely and write custom Hadoop jobs if you prefer. + +## Requirements + +## Usage + +### Using ElasticSearchStorage for Apache Pig + +The most up-to-date (and simplest) way to store data into elasticsearch with hadoop is to use the Pig Store Function. You can write both delimited and json data to elasticsearch as well as read data from elasticsearch. + +#### Storing tabular data: + +This allows you to store tabular data (eg. tsv, csv) into elasticsearch. + +```pig +%default ES_JAR_DIR '/usr/local/share/elasticsearch/lib' +%default INDEX 'ufo_sightings' +%default OBJ 'sighting' + +register target/wonderdog*.jar; +register $ES_JAR_DIR/*.jar; + +ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray); +STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=false&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); +``` + +Here the fields that you set in Pig (eg. 'sighted_at') are used as the field names when creating json records for elasticsearch. + +#### Storing json data: + +You can store json data just as easily. + +```pig +ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_record:chararray); +STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=true&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); +``` + +#### Reading data: + +Easy too. + +```pig +-- dump some of the ufo sightings index based on free text query +alien_sightings = LOAD 'es://ufo_sightings/ufo_sightings?q=alien' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage() AS (doc_id:chararray, contents:chararray); +DUMP alien_sightings; +``` + +#### ElasticSearchStorage Constructor + +The constructor to the UDF can take two arguments (in the following order): + +* ```esConfig``` - The full path to where elasticsearch.yml lives on the machine launching the hadoop job +* ```esPlugins``` - The full path to where the elasticsearch plugins directory lives on the machine launching the hadoop job + +#### Query Parameters + +There are a few query paramaters available: + +* ```json``` - (STORE only) When 'true' indicates to the StoreFunc that pre-rendered json records are being indexed. Default is false. +* ```size``` - When storing, this is used as the bulk request size (the number of records to stack up before indexing to elasticsearch). When loading, this is the number of records to fetch per request. Default 1000. +* ```q``` - (LOAD only) A free text query determining which records to load. If empty, matches all documents in the index. +* ```id``` - (STORE only) The name of the field to use as a document id. If blank (or -1) the documents are assumed to have no id and are assigned one by elasticsearch. +* ```tasks``` - (LOAD only) The number of map tasks to launch. Default 100. + +Note that elasticsearch.yml and the plugins directory are distributed to every machine in the cluster automatically via hadoop's distributed cache mechanism. + +### Native Hadoop TSV Loader + +**Note**: the tsv loader is deprecated. Instead, use the ElasticSearchOutputFormat coupled with either Apache Pig storefunc (ElasticSearchIndex or ElasticSearchJsonIndex). + +Once you've got a working set up you should be ready to launch your bulkload process. The best way to explain is with an example. Say you've got a tsv file of user records (name,login,email,description) and you want to index all the fields. Assuming you're going to write to an index called ```users``` with objects of type ```user``` (elasticsearch will create this object automatically the first time you upload one). The workflow is as follows: + +* Create the ```users``` index: + +``` +bin/estool create --index users +``` + +* Upload the data + +``` +# Will only work if the hadoop elasticsearch processes can discover the running elasticsearch cluster +bin/wonderdog --rm --index_name=users --bulk_size=4096 --object_type=user --field_names=name,login,email,description --id_field=1 /hdfs/path/to/users.tsv /tmp/failed_records/users +``` + +Notice the output path. When the bulk indexing job runs it is possible for index requests to fail for various reasons (too much load, etc). In this case the documents that failed are simply written to the hdfs so they can be retried in a later job. + +* Refresh Index + +After the bulk load is finished you'll want to refresh the index so your documents will actually be searchable: + +``` +bin/estool refresh --index users +``` + +* Snapshot Index + +You'll definitely want to do this after the bulk load finishes so you don't lose any data in case of cluster failure: + +``` +bin/estool snapshot --index users +``` + +* Bump the replicas for the index up to at least one. + +``` +bin/estool set_replication --index users --replicas=1 +``` + +This will take a while to finish and the cluster health will show yellow until it does. + +* Optimize the index + +``` +bin/estool optimize --index users -s 3 +``` + +This will also take a while to finish. + +#### TSV loader command-line options + +* ```index_name``` - Index to write data to. It does not have to exist ahead of time +* ```object_type``` - Type of object to index. The mapping for this object does not have to exist ahead of time. Fields will be updated dynamically by elasticsearch. +* ```field_names``` - A comma separated list of field names describing the tsv record input +* ```id_field``` - Index of field to use as object id (counting from 0; default 1), use -1 if there is no id field +* ```bulk_size``` - Number of records per bulk request sent to elasticsearch cluster +* ```es_home``` - Path to elasticsearch installation, read from the ES_HOME environment variable if it's set +* ```es_config``` - Path to elasticsearch config file (@elasticsearch.yml@) +* ```rm``` - Remove existing output? (true or leave blank) +* ```hadoop_home``` - Path to hadoop installation, read from the HADOOP_HOME environment variable if it's set +* ```min_split_size``` - Min split size for maps + +## Admin + +There are a number of convenience commands in ```bin/estool```. Most of the common rest api operations have be mapped. Enumerating a few: + +* Print status of all indices as a json hash to the terminal + +``` +# See everything (tmi) +bin/estool -c status +``` + +* Check cluster health (red,green,yellow,relocated shards, etc) + +``` +bin/estool -c health +``` + +* Set replicas for an index + +``` +bin/estool set_replication -c --index --replicas +``` + +* Optimize an index + +``` +bin/estool optimize -c --index +``` + +* Snapshot an index + +``` +bin/estool snapshot -c --index +``` + +* Delete an index + +``` +bin/estool delete -c --index +``` diff --git a/README.textile b/README.textile deleted file mode 100644 index 7c35b84..0000000 --- a/README.textile +++ /dev/null @@ -1,225 +0,0 @@ -h1. Wonderdog - -Wonderdog is a Hadoop interface to Elastic Search. While it is specifically intended for use with Apache Pig, it does include all the necessary Hadoop input and output formats for Elastic Search. That is, it's possible to skip Pig entirely and write custom Hadoop jobs if you prefer. - -h2. Requirements - -h2. Usage - -h3. Using ElasticSearchStorage for Apache Pig - -The most up-to-date (and simplest) way to store data into elasticsearch with hadoop is to use the Pig Store Function. You can write both delimited and json data to elasticsearch as well as read data from elasticsearch. - -h4. Storing tabular data: - -This allows you to store tabular data (eg. tsv, csv) into elasticsearch. - -

-register 'target/wonderdog.jar';
-register '/usr/local/share/elasticsearch/lib/elasticsearch-0.18.7.jar';
-register '/usr/local/share/elasticsearch/lib/jline-0.9.94.jar';
-register '/usr/local/share/elasticsearch/lib/jna-3.2.7.jar';
-register '/usr/local/share/elasticsearch/lib/log4j-1.2.16.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-analyzers-3.5.0.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-core-3.5.0.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-highlighter-3.5.0.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-memory-3.5.0.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-queries-3.5.0.jar';
-
-%default INDEX 'ufo_sightings'
-%default OBJ   'ufo_sighting'        
-
-ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray);
-STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=false&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage();
-
- -Here the fields that you set in Pig (eg. 'sighted_at') are used as the field names when creating json records for elasticsearch. - -h4. Storing json data: - -You can store json data just as easily. - -

-ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_record:chararray);
-STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=true&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage();
-
- -h4. Reading data: - -Easy too. - -

--- dump some of the ufo sightings index based on free text query
-alien_sightings = LOAD 'es://ufo_sightings/ufo_sightings?q=alien' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage() AS (doc_id:chararray, contents:chararray);
-DUMP alien_sightings;
-
- -h4. ElasticSearchStorage Constructor - -The constructor to the UDF can take two arguments (in the following order): - -* @esConfig@ - The full path to where elasticsearch.yml lives on the machine launching the hadoop job -* @esPlugins@ - The full path to where the elasticsearch plugins directory lives on the machine launching the hadoop job - -h4. Query Parameters - -There are a few query paramaters available: - -* @json@ - (STORE only) When 'true' indicates to the StoreFunc that pre-rendered json records are being indexed. Default is false. -* @size@ - When storing, this is used as the bulk request size (the number of records to stack up before indexing to elasticsearch). When loading, this is the number of records to fetch per request. Default 1000. -* @q@ - (LOAD only) A free text query determining which records to load. If empty, matches all documents in the index. -* @id@ - (STORE only) The name of the field to use as a document id. If blank (or -1) the documents are assumed to have no id and are assigned one by elasticsearch. -* @tasks@ - (LOAD only) The number of map tasks to launch. Default 100. - -Note that elasticsearch.yml and the plugins directory are distributed to every machine in the cluster automatically via hadoop's distributed cache mechanism. - -h3. Native Hadoop TSV Loader - -**Note**: the tsv loader is deprecated. Instead, use the ElasticSearchOutputFormat coupled with either Apache Pig storefunc (ElasticSearchIndex or ElasticSearchJsonIndex). - -Once you've got a working set up you should be ready to launch your bulkload process. The best way to explain is with an example. Say you've got a tsv file of user records (name,login,email,description) and you want to index all the fields. Assuming you're going to write to an index called @users@ with objects of type @user@ (elasticsearch will create this object automatically the first time you upload one). The workflow is as follows: - -* Create the @users@ index: - -

-bin/estool --host= --index_name=users create_index
-
- -* Upload the data - -

-# Will only work if the hadoop elasticsearch processes can discover the running elasticsearch cluster
-bin/wonderdog --rm --index_name=users --bulk_size=4096 --object_type=user --field_names=name,login,email,description --id_field=1 /hdfs/path/to/users.tsv /tmp/failed_records/users
-
- -Notice the output path. When the bulk indexing job runs it is possible for index requests to fail for various reasons (too much load, etc). In this case the documents that failed are simply written to the hdfs so they can be retried in a later job. - -* Refresh Index - -After the bulk load is finished you'll want to refresh the index so your documents will actually be searchable: - -

-bin/estool --host= --index_name=users refresh_index
-
- -* Snapshot Index - -You'll definitely want to do this after the bulk load finishes so you don't lose any data in case of cluster failure: - -

-bin/estool --host= --index_name=users snapshot_index
-
- - -* Bump the replicas for the index up to at least one. - -

-bin/estool --host= --index_name=users --replicas=1 set_replicas
-
- -This will take a while to finish and the cluster health will show yellow until it does. - -* Optimize the index - -

-bin/estool --host= --index_name=users optimize_index
-
- -This will also take a while to finish. - -h4. TSV loader command-line options - -* @index_name@ - Index to write data to. It does not have to exist ahead of time -* @object_type@ - Type of object to index. The mapping for this object does not have to exist ahead of time. Fields will be updated dynamically by elasticsearch. -* @field_names@ - A comma separated list of field names describing the tsv record input -* @id_field@ - Index of field to use as object id (counting from 0; default 1), use -1 if there is no id field -* @bulk_size@ - Number of records per bulk request sent to elasticsearch cluster -* @es_home@ - Path to elasticsearch installation, read from the ES_HOME environment variable if it's set -* @es_config@ - Path to elasticsearch config file (@elasticsearch.yml@) -* @rm@ - Remove existing output? (true or leave blank) -* @hadoop_home@ - Path to hadoop installation, read from the HADOOP_HOME environment variable if it's set -* @min_split_size@ - Min split size for maps - -h2. Admin - -There are a number of convenience commands in @bin/estool@. Enumerating a few: - -* Print status of all indices as a json hash to the terminal - -

-# See everything (tmi)
-bin/estool --host= status
-
-# Just see index level stuff on docs and replicas:
-bin/estool --host= status | grep -C10 number_of_replicas
-
- -* Check cluster health (red,green,yellow,relocated shards, etc) - -

-bin/estool --host= health
-
- -* Set replicas for an index - -

-bin/estool --host= --index_name= --replicas= set_replicas
-
- -* Optimize an index - -

-bin/estool --host= --index_name= optimize_index
-
- -* Snapshot an index - -

-bin/estool --host= --index_name= snapshot_index
-
- -* Delete an index - -You'd better be sure. - -

-bin/estool --host= --index_name= delete_index
-
- - -And so on. Most of the common rest api operations have be mapped into estool. - - - - - - - ------------------------------------ THIS NEEDS CLEANED UP AND TURNED INTO A COHERENT SECTION, RIGHT NOW IT IS INCOMPREHENSIBLE ---------------------------- -h3. How to choose shards, replicas and cluster size: Rules of Thumb. - -sh = shards -rf = replication factor. replicas = 0 implies rf = 1, or 1 replica of each shard. - -pm = running data_esnode processes per machine -N = number of machines - -n_cores = number of cpu cores per machine -n_disks = number of disks per machine - - - -* You must have at least as many data_esnodes as - Mandatory: (sh * rf) < (pm * N) - - - Shards: shard size < 10GB - - -More shards = more parallel writes - - - -curl -XPUT "`cat /etc/motd | grep IP | cuttab 2`:9200/tweet-2010q1/_settings" -d '{ "index":{ "number_of_replicas":1 } }' ------------------------------------------------------------------------------------------------------------------------------------------------------------ - From 4a55574aded225485e7eb169b12050dde7e6d97d Mon Sep 17 00:00:00 2001 From: Travis Dempsey Date: Mon, 9 Apr 2012 11:44:23 -0500 Subject: [PATCH 04/12] cleaned up documentation, round 1 --- notes/cluster_notes.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 notes/cluster_notes.md diff --git a/notes/cluster_notes.md b/notes/cluster_notes.md new file mode 100644 index 0000000..b3961f7 --- /dev/null +++ b/notes/cluster_notes.md @@ -0,0 +1,17 @@ +### How to choose shards, replicas and cluster size: Rules of Thumb. + +sh = shards +rf = replication factor. replicas = 0 implies rf = 1, or 1 replica of each shard. + +pm = running data_esnode processes per machine +N = number of machines + +n_cores = number of cpu cores per machine +n_disks = number of disks per machine + +* You must have at least as many data_esnodes as + Mandatory: (sh * rf) < (pm * N) + + Shards: shard size < 10GB + +More shards = more parallel writes \ No newline at end of file From 9db09cfd8ca776b72f722af787761374d8ee1a62 Mon Sep 17 00:00:00 2001 From: Travis Dempsey Date: Wed, 25 Apr 2012 13:09:54 -0500 Subject: [PATCH 05/12] added apache license --- LICENSE.md | 201 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 LICENSE.md diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..1b22bef --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file From d061ee5fcbb366061f7ed0e48c971352b8bde644 Mon Sep 17 00:00:00 2001 From: Russell Jurney Date: Mon, 9 Jul 2012 09:50:41 -0700 Subject: [PATCH 06/12] Fixed Wonderdog for Pig 0.10 by checking the conf object if the call to the hadoop cache misses --- .../infochimps/elasticsearch/ElasticSearchOutputFormat.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java index 10baf47..e11ac08 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java @@ -120,7 +120,8 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { this.objType = conf.get(ES_OBJECT_TYPE); // - // Fetches elasticsearch.yml and the plugins directory from the distributed cache + // Fetches elasticsearch.yml and the plugins directory from the distributed cache, or + // from the local config. // try { String taskConfigPath = HadoopUtils.fetchFileFromCache(ES_CONFIG_NAME, conf); @@ -130,7 +131,8 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { System.setProperty(ES_CONFIG, taskConfigPath); System.setProperty(ES_PLUGINS, taskPluginsPath+SLASH+ES_PLUGINS_NAME); } catch (Exception e) { - throw new RuntimeException(e); + System.setProperty(ES_CONFIG,conf.get(ES_CONFIG)); + System.setProperty(ES_PLUGINS,conf.get(ES_PLUGINS)); } start_embedded_client(); From ee3e465fd6415f341c7c444dd94c3ad57e982eb0 Mon Sep 17 00:00:00 2001 From: Jerome Gagnon Date: Mon, 4 Mar 2013 14:14:09 -0500 Subject: [PATCH 07/12] Added the possibility to use transport client instead of node. Main advantage of this is to avoid "too many open files" error on hadoop workers. --- .../ElasticSearchOutputFormat.java | 75 +++++++++++-------- .../pig/ElasticSearchStorage.java | 49 ++++++------ 2 files changed, 65 insertions(+), 59 deletions(-) diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java index e11ac08..d260180 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java @@ -1,43 +1,30 @@ package com.infochimps.elasticsearch; -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.HashMap; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.Random; -import java.net.URI; - +import com.infochimps.elasticsearch.hadoop.util.HadoopUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.filecache.DistributedCache; - -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.client.action.bulk.BulkRequestBuilder; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; -import com.infochimps.elasticsearch.hadoop.util.HadoopUtils; +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; /** @@ -50,6 +37,10 @@ public class ElasticSearchOutputFormat extends OutputFormat { private Node node; @@ -59,6 +50,7 @@ protected class ElasticSearchRecordWriter extends RecordWriterelasticsearch.bulk.size - The number of records to be accumulated into a bulk request before writing to elasticsearch.
  • elasticsearch.is_json - A boolean indicating whether the records to be indexed are json records. If false the records are assumed to be tsv, in which case elasticsearch.field.names must be set and contain a comma separated list of field names
  • elasticsearch.object.type - The type of objects being indexed
  • +
  • elasticsearch.client.type - The type of client to be used for indexation. (client or transport). Default to client.
  • elasticsearch.config - The full path the elasticsearch.yml. It is a local path and must exist on all machines in the hadoop cluster.
  • elasticsearch.plugins.dir - The full path the elasticsearch plugins directory. It is a local path and must exist on all machines in the hadoop cluster.
  • @@ -118,7 +112,12 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { LOG.info("Using field:["+idFieldName+"] for document ids"); } this.objType = conf.get(ES_OBJECT_TYPE); - + if("transport".equals(conf.get(ES_CLIENT_TYPE))) { + this.clientType = ClientType.TRANSPORT; + } else { + this.clientType = ClientType.CLIENT; + } + // // Fetches elasticsearch.yml and the plugins directory from the distributed cache, or // from the local config. @@ -255,8 +254,20 @@ private void initialize_index(String indexName) { // private void start_embedded_client() { LOG.info("Starting embedded elasticsearch client ..."); - this.node = NodeBuilder.nodeBuilder().client(true).node(); - this.client = node.client(); + switch (clientType) { + case TRANSPORT: + // We don't want to sniff to avoid Too many files open + // So it is going to take only the node in configuration + Settings settings = ImmutableSettings.settingsBuilder() + .put("client.transport.sniff", "false").build(); + this.client = new TransportClient(settings); + break; + case CLIENT: + default: + this.node = NodeBuilder.nodeBuilder().client(true).node(); + this.client = node.client(); + break; + } } } diff --git a/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java b/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java index a2fac59..d714874 100644 --- a/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java +++ b/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java @@ -1,42 +1,31 @@ package com.infochimps.elasticsearch.pig; -import java.io.IOException; -import java.lang.InterruptedException; -import java.util.Properties; -import java.util.List; -import java.util.Map; -import java.util.HashMap; -import java.net.URI; -import java.net.URISyntaxException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.map.JsonMappingException; - +import com.infochimps.elasticsearch.ElasticSearchInputFormat; +import com.infochimps.elasticsearch.ElasticSearchOutputFormat; +import com.infochimps.elasticsearch.hadoop.util.HadoopUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.io.*; - +import org.apache.hadoop.mapreduce.*; import org.apache.pig.LoadFunc; -import org.apache.pig.StoreFuncInterface; import org.apache.pig.ResourceSchema; -import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.UDFContext; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; -import com.infochimps.elasticsearch.ElasticSearchOutputFormat; -import com.infochimps.elasticsearch.ElasticSearchInputFormat; -import com.infochimps.elasticsearch.hadoop.util.HadoopUtils; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; public class ElasticSearchStorage extends LoadFunc implements StoreFuncInterface { @@ -57,6 +46,7 @@ public class ElasticSearchStorage extends LoadFunc implements StoreFuncInterface private static final String ES_REQUEST_SIZE = "elasticsearch.request.size"; private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits"; private static final String ES_QUERY_STRING = "elasticsearch.query.string"; + private static final String ES_CLIENT_TYPE = "elasticsearch.client.type"; private static final String COMMA = ","; private static final String LOCAL_SCHEME = "file://"; @@ -247,6 +237,11 @@ private void elasticSearchSetup(String location, Job job) { if (queryString==null) queryString = "*"; job.getConfiguration().set(ES_QUERY_STRING, queryString); + // Set elasticsearch client type to be used in the Hadoop configuration + String clientType = query.get("client"); + if (clientType==null) clientType = "client"; + job.getConfiguration().set(ES_CLIENT_TYPE, clientType); + String numTasks = query.get("tasks"); if (numTasks==null) numTasks = "100"; job.getConfiguration().set(ES_NUM_SPLITS, numTasks); From 85025e5b332bba54ca25cce5500da5bef69ccf5d Mon Sep 17 00:00:00 2001 From: Jerome Gagnon Date: Mon, 4 Mar 2013 14:24:23 -0500 Subject: [PATCH 08/12] Change client type "client" to "node" for clarity. Updated documentation accordingly. --- README.md | 1 + .../elasticsearch/ElasticSearchOutputFormat.java | 14 +++++++------- .../elasticsearch/pig/ElasticSearchStorage.java | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index a0aabd5..ab064bd 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ There are a few query paramaters available: * ```q``` - (LOAD only) A free text query determining which records to load. If empty, matches all documents in the index. * ```id``` - (STORE only) The name of the field to use as a document id. If blank (or -1) the documents are assumed to have no id and are assigned one by elasticsearch. * ```tasks``` - (LOAD only) The number of map tasks to launch. Default 100. +* ```client``` - (STORE ony) The type of client to be used. This setting can be useful to avoid "too many open files" on hadoop workers, since "client.transport.sniff" is set to false which mean it will only takes the nodes specified in config file. take Default to Node. Note that elasticsearch.yml and the plugins directory are distributed to every machine in the cluster automatically via hadoop's distributed cache mechanism. diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java index d260180..12cee46 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java @@ -27,18 +27,18 @@ import java.util.concurrent.atomic.AtomicLong; /** - + Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) into Elasticsearch. Records are batched up and sent in a one-hop manner to the elastic search data nodes that will index them. - + */ public class ElasticSearchOutputFormat extends OutputFormat implements Configurable { - + static Log LOG = LogFactory.getLog(ElasticSearchOutputFormat.class); private Configuration conf = null; enum ClientType { - CLIENT, TRANSPORT + NODE, TRANSPORT } protected class ElasticSearchRecordWriter extends RecordWriter { @@ -88,7 +88,7 @@ protected class ElasticSearchRecordWriter extends RecordWriterelasticsearch.bulk.size - The number of records to be accumulated into a bulk request before writing to elasticsearch.
  • elasticsearch.is_json - A boolean indicating whether the records to be indexed are json records. If false the records are assumed to be tsv, in which case elasticsearch.field.names must be set and contain a comma separated list of field names
  • elasticsearch.object.type - The type of objects being indexed
  • -
  • elasticsearch.client.type - The type of client to be used for indexation. (client or transport). Default to client.
  • +
  • elasticsearch.client.type - The type of client to be used for indexation. (node or transport). Default to node.
  • elasticsearch.config - The full path the elasticsearch.yml. It is a local path and must exist on all machines in the hadoop cluster.
  • elasticsearch.plugins.dir - The full path the elasticsearch plugins directory. It is a local path and must exist on all machines in the hadoop cluster.
  • @@ -115,7 +115,7 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { if("transport".equals(conf.get(ES_CLIENT_TYPE))) { this.clientType = ClientType.TRANSPORT; } else { - this.clientType = ClientType.CLIENT; + this.clientType = ClientType.NODE; } // @@ -262,7 +262,7 @@ private void start_embedded_client() { .put("client.transport.sniff", "false").build(); this.client = new TransportClient(settings); break; - case CLIENT: + case NODE: default: this.node = NodeBuilder.nodeBuilder().client(true).node(); this.client = node.client(); diff --git a/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java b/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java index d714874..d57d2f8 100644 --- a/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java +++ b/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java @@ -239,7 +239,7 @@ private void elasticSearchSetup(String location, Job job) { // Set elasticsearch client type to be used in the Hadoop configuration String clientType = query.get("client"); - if (clientType==null) clientType = "client"; + if (clientType==null) clientType = "node"; job.getConfiguration().set(ES_CLIENT_TYPE, clientType); String numTasks = query.get("tasks"); From 4ce56090d2a44b0d0d77f2b6c61253f7bc2b4c11 Mon Sep 17 00:00:00 2001 From: Jerome Gagnon Date: Mon, 4 Mar 2013 16:14:39 -0500 Subject: [PATCH 09/12] Changed version of ElasticSearch to 0.20.x to fix dependencies. Added addresses handling for transport client. --- pom.xml | 4 +- .../ElasticSearchOutputFormat.java | 48 +++++++++++++------ .../infochimps/elasticsearch/ElasticTest.java | 40 +++++----------- .../pig/ElasticSearchStorage.java | 9 ++++ 4 files changed, 56 insertions(+), 45 deletions(-) diff --git a/pom.xml b/pom.xml index 09fd866..d4750b7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.infochimps.elasticsearch wonderdog - 1.0-SNAPSHOT + 1.0.1-SNAPSHOT jar wonderdog @@ -18,7 +18,7 @@ org.elasticsearch elasticsearch - 0.18.7 + 0.20.1 diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java index 12cee46..cc48bac 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java @@ -8,13 +8,14 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.elasticsearch.client.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.indices.IndexAlreadyExistsException; @@ -22,6 +23,7 @@ import org.elasticsearch.node.NodeBuilder; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicLong; @@ -51,12 +53,13 @@ protected class ElasticSearchRecordWriter extends RecordWriterelasticsearch.id.field.name - When elasticsearch.is_json is true, this is the name of a field in the json document that contains the document's id. If -1 is used then the document is assumed to have no id and one is assigned to it by elasticsearch.
  • elasticsearch.field.names - When elasticsearch.is_json is false, this is a comma separated list of field names.
  • elasticsearch.id.field - When elasticsearch.is_json is false, this is the numeric index of the field to use as the document id. If -1 is used the document is assumed to have no id and one is assigned to it by elasticsearch.
  • - + */ public ElasticSearchRecordWriter(TaskAttemptContext context) { Configuration conf = context.getConfiguration(); @@ -112,8 +116,15 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { LOG.info("Using field:["+idFieldName+"] for document ids"); } this.objType = conf.get(ES_OBJECT_TYPE); - if("transport".equals(conf.get(ES_CLIENT_TYPE))) { + if(ClientType.TRANSPORT.name().equalsIgnoreCase(conf.get(ES_CLIENT_TYPE))) { this.clientType = ClientType.TRANSPORT; + // Get the address + if(conf.get(ES_TRANS_ADDR) == null) { + LOG.error("Transport client must have nodes addresses."); + throw new RuntimeException("Transport client must have nodes addresses."); + } else { + this.transportAddr = conf.get(ES_TRANS_ADDR).split(","); + } } else { this.clientType = ClientType.NODE; } @@ -133,7 +144,7 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { System.setProperty(ES_CONFIG,conf.get(ES_CONFIG)); System.setProperty(ES_PLUGINS,conf.get(ES_PLUGINS)); } - + start_embedded_client(); initialize_index(indexName); currentRequest = client.prepareBulk(); @@ -143,7 +154,7 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { Closes the connection to elasticsearch. Any documents remaining in the bulkRequest object are indexed. */ public void close(TaskAttemptContext context) throws IOException { - if (currentRequest.numberOfActions() > 0) { + if (currentRequest.numberOfActions() > 0) { try { BulkResponse response = currentRequest.execute().actionGet(); } catch (Exception e) { @@ -174,7 +185,7 @@ public void write(NullWritable key, MapWritable fields) throws IOException { try { Text mapKey = new Text(idFieldName); String record_id = fields.get(mapKey).toString(); - currentRequest.add(Requests.indexRequest(indexName).id(record_id).type(objType).create(false).source(builder)); + currentRequest.add(Requests.indexRequest(indexName).id(record_id).type(objType).create(false).source(builder)); } catch (Exception e) { LOG.warn("Encountered malformed record"); } @@ -197,14 +208,14 @@ private void buildContent(XContentBuilder builder, Writable value) throws IOExce } else if (value instanceof FloatWritable) { builder.value(((FloatWritable)value).get()); } else if (value instanceof BooleanWritable) { - builder.value(((BooleanWritable)value).get()); + builder.value(((BooleanWritable)value).get()); } else if (value instanceof MapWritable) { builder.startObject(); for (Map.Entry entry : ((MapWritable)value).entrySet()) { if (!(entry.getValue() instanceof NullWritable)) { builder.field(entry.getKey().toString()); buildContent(builder, entry.getValue()); - } + } } builder.endObject(); } else if (value instanceof ArrayWritable) { @@ -214,7 +225,7 @@ private void buildContent(XContentBuilder builder, Writable value) throws IOExce buildContent(builder, arrayOfThings[i]); } builder.endArray(); - } + } } /** @@ -253,14 +264,23 @@ private void initialize_index(String indexName) { // Starts an embedded elasticsearch client (ie. data = false) // private void start_embedded_client() { - LOG.info("Starting embedded elasticsearch client ..."); + LOG.info("Starting embedded elasticsearch [" + clientType.toString() + "] client ..."); switch (clientType) { case TRANSPORT: // We don't want to sniff to avoid Too many files open // So it is going to take only the node in configuration Settings settings = ImmutableSettings.settingsBuilder() .put("client.transport.sniff", "false").build(); - this.client = new TransportClient(settings); + + TransportClient transport = new TransportClient(settings); + for(String addr : transportAddr) { + String[] sockAddr = addr.split(":"); + transport.addTransportAddress( + new InetSocketTransportAddress( + new InetSocketAddress(sockAddr[0], Integer.parseInt(sockAddr[1])))); + } + + this.client = transport; break; case NODE: default: diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticTest.java b/src/main/java/com/infochimps/elasticsearch/ElasticTest.java index 8fc9c50..f6ac1bf 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticTest.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticTest.java @@ -1,42 +1,24 @@ package com.infochimps.elasticsearch; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.Random; -import java.util.Map; -import java.util.HashMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.MapWritable; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.action.bulk.BulkRequestBuilder; -import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.ExceptionsHelper; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; // diff --git a/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java b/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java index d57d2f8..316b30b 100644 --- a/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java +++ b/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java @@ -35,6 +35,7 @@ public class ElasticSearchStorage extends LoadFunc implements StoreFuncInterface protected ObjectMapper mapper = new ObjectMapper(); protected String esConfig; protected String esPlugins; + protected String transportAddresses; // For hadoop configuration private static final String ES_INDEX_NAME = "elasticsearch.index.name"; @@ -47,6 +48,7 @@ public class ElasticSearchStorage extends LoadFunc implements StoreFuncInterface private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits"; private static final String ES_QUERY_STRING = "elasticsearch.query.string"; private static final String ES_CLIENT_TYPE = "elasticsearch.client.type"; + private static final String ES_TRANS_ADDR = "elasticsearch.client.transport.addresses"; private static final String COMMA = ","; private static final String LOCAL_SCHEME = "file://"; @@ -71,6 +73,11 @@ public ElasticSearchStorage(String esConfig, String esPlugins) { this.esPlugins = esPlugins; } + public ElasticSearchStorage(String esConfig, String esPlugins, String addresses) { + this(esConfig, esPlugins); + this.transportAddresses = addresses; + } + @Override public Tuple getNext() throws IOException { try { @@ -242,6 +249,8 @@ private void elasticSearchSetup(String location, Job job) { if (clientType==null) clientType = "node"; job.getConfiguration().set(ES_CLIENT_TYPE, clientType); + if(this.transportAddresses != null) job.getConfiguration().set(ES_TRANS_ADDR, transportAddresses); + String numTasks = query.get("tasks"); if (numTasks==null) numTasks = "100"; job.getConfiguration().set(ES_NUM_SPLITS, numTasks); From c2680c8aaf6ab92886acda158d3edfe195f8b97e Mon Sep 17 00:00:00 2001 From: Jerome Gagnon Date: Wed, 20 Mar 2013 13:47:00 -0400 Subject: [PATCH 10/12] Added logging to display connected address --- .../elasticsearch/ElasticSearchOutputFormat.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java index cc48bac..7e8179b 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java @@ -264,7 +264,7 @@ private void initialize_index(String indexName) { // Starts an embedded elasticsearch client (ie. data = false) // private void start_embedded_client() { - LOG.info("Starting embedded elasticsearch [" + clientType.toString() + "] client ..."); + LOG.info("Starting embedded elasticsearch [" + clientType.name() + "] client ..."); switch (clientType) { case TRANSPORT: // We don't want to sniff to avoid Too many files open @@ -275,9 +275,12 @@ private void start_embedded_client() { TransportClient transport = new TransportClient(settings); for(String addr : transportAddr) { String[] sockAddr = addr.split(":"); - transport.addTransportAddress( + LOG.info("Connecting to : " + addr); + if (sockAddr.length == 2) { + transport.addTransportAddress( new InetSocketTransportAddress( - new InetSocketAddress(sockAddr[0], Integer.parseInt(sockAddr[1])))); + new InetSocketAddress(sockAddr[0], Integer.parseInt(sockAddr[1])))); + } } this.client = transport; From bea0ee81e4d3e21ed89adebbcdfaa6496a3556bd Mon Sep 17 00:00:00 2001 From: Jerome Gagnon Date: Wed, 20 Mar 2013 13:54:01 -0400 Subject: [PATCH 11/12] Update to documentation how to use TransportClient --- README.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ab064bd..5ebabe4 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,15 @@ ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_recor STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=true&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); ``` +### Storing json data (using TransportClient) : ### + +```pig +ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_record:chararray); +STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=true&size=1000&client=transport' + USING com.infochimps.elasticsearch.pig.ElasticSearchStorage('path/to/config', 'path/to/plugins', '10.0.0.1:9300,10.0.0.2:9300,10.0.0.3:9300'); +``` + + #### Reading data: Easy too. @@ -50,10 +59,11 @@ DUMP alien_sightings; #### ElasticSearchStorage Constructor -The constructor to the UDF can take two arguments (in the following order): +The constructor to the UDF can take two or tree (depending on the client type) arguments (in the following order): * ```esConfig``` - The full path to where elasticsearch.yml lives on the machine launching the hadoop job * ```esPlugins``` - The full path to where the elasticsearch plugins directory lives on the machine launching the hadoop job +* ```addresses``` - The comma separated entry-point nodes where the hadoop workers will connect to index data #### Query Parameters From 74db476700f552dcf1c2fa9eada0f5e2846dece7 Mon Sep 17 00:00:00 2001 From: Jerome Gagnon Date: Wed, 20 Mar 2013 14:55:52 -0300 Subject: [PATCH 12/12] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5ebabe4..48ed9d3 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_recor STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=true&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); ``` -### Storing json data (using TransportClient) : ### +##### Storing json data (using TransportClient) : ```pig ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_record:chararray);