diff --git a/.gitignore b/.gitignore index 83feaa7..d3221d0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,16 +1,11 @@ -\#* -.\#* -*~ -.DS_Store -Icon? -REVISION -TAGS* -nohup.out -.bzr -.hg -.svn -a.out + + + + + + +*.log *.o *.pyc *.so @@ -18,30 +13,39 @@ a.out *.sw? *.tmproj *_flymake.* +*private* +*~ +.DS_Store +.\#* +.bzr +.hg .project .pydevproject .settings +.svn .tasks-cache .yardoc - -*private* -/log/* -/pkg/* -/tmp/* -/coverage - -/db/*.sqlite3 +/Gemfile.lock +/config/apeyeye.yaml /config/database.yml /config/private.yml +/config/routes.rb /config/settings.yml /config/sphinx.yml +/coverage +/db/*.sqlite3 +/log/* +/pkg/* /public/stylesheets/compiled/* - -/webrat.log +/target +/tmp/* /vendor/webrat/vendor - +/webrat.log +Gemfile.lock +Icon? +REVISION +TAGS* +\#* +a.out doc - -/config/apeyeye.yaml -/config/routes.rb -/target +nohup.out diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..f0fb813 --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--format=progress +--color diff --git a/.yardopts b/.yardopts new file mode 100644 index 0000000..b6718f9 --- /dev/null +++ b/.yardopts @@ -0,0 +1,6 @@ +--readme README.md +--markup markdown +- +CHANGELOG.md +LICENSE.md +README.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f818640 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +Wonderdog has been updated to work with Elasticsearch v17 or greater. Previous code was moved into the branch v17_pre and will be deprecated. All changes going forward will be based upon compatibility with Elasticsearch v17 or greater and will live in the master branch. + +* estool has been redone completely for enhanced usability. Run ```bin/estool --help``` for examples. + +* Updated and cleaned up test directory. diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..96dc046 --- /dev/null +++ b/Gemfile @@ -0,0 +1,10 @@ +source :rubygems + +gemspec + +group :development do + gem 'rake', '~> 0.9' + gem 'rspec', '~> 2' + gem 'yard' + gem 'redcarpet' +end diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..1b22bef --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..6cd065e --- /dev/null +++ b/README.md @@ -0,0 +1,289 @@ +# Wonderdog + +Wonderdog is a Hadoop interface to Elastic Search. While it is specifically intended for use with Apache Pig, it does include all the necessary Hadoop input and output formats for Elastic Search. That is, it's possible to skip Pig entirely and write custom Hadoop jobs if you prefer. + +## Requirements + +## Usage + +### Using ElasticSearchStorage for Apache Pig + +The most up-to-date (and simplest) way to store data into elasticsearch with hadoop is to use the Pig Store Function. You can write both delimited and json data to elasticsearch as well as read data from elasticsearch. + +#### Storing tabular data: + +This allows you to store tabular data (eg. tsv, csv) into elasticsearch. + +```pig +%default ES_JAR_DIR '/usr/local/share/elasticsearch/lib' +%default INDEX 'ufo_sightings' +%default OBJ 'sighting' + +register target/wonderdog*.jar; +register $ES_JAR_DIR/*.jar; + +ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray); +STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=false&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); +``` + +Here the fields that you set in Pig (eg. 'sighted_at') are used as the field names when creating json records for elasticsearch. + +#### Storing json data: + +You can store json data just as easily. + +```pig +ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_record:chararray); +STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=true&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); +``` + +#### Reading data: + +Easy too. + +```pig +-- dump some of the ufo sightings index based on free text query +alien_sightings = LOAD 'es://ufo_sightings/ufo_sightings?q=alien' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage() AS (doc_id:chararray, contents:chararray); +DUMP alien_sightings; +``` + +#### ElasticSearchStorage Constructor + +The constructor to the UDF can take two arguments (in the following order): + +* ```esConfig``` - The full path to where elasticsearch.yml lives on the machine launching the hadoop job +* ```esPlugins``` - The full path to where the elasticsearch plugins directory lives on the machine launching the hadoop job + +#### Query Parameters + +There are a few query paramaters available: + +* ```json``` - (STORE only) When 'true' indicates to the StoreFunc that pre-rendered json records are being indexed. Default is false. +* ```size``` - When storing, this is used as the bulk request size (the number of records to stack up before indexing to elasticsearch). When loading, this is the number of records to fetch per request. Default 1000. +* ```q``` - (LOAD only) A free text query determining which records to load. If empty, matches all documents in the index. +* ```id``` - (STORE only) The name of the field to use as a document id. If blank (or -1) the documents are assumed to have no id and are assigned one by elasticsearch. +* ```tasks``` - (LOAD only) The number of map tasks to launch. Default 100. + +Note that elasticsearch.yml and the plugins directory are distributed to every machine in the cluster automatically via hadoop's distributed cache mechanism. + +### Native Hadoop TSV Loader + +**Note**: the tsv loader is deprecated. Instead, use the ElasticSearchOutputFormat coupled with either Apache Pig storefunc (ElasticSearchIndex or ElasticSearchJsonIndex). + +Once you've got a working set up you should be ready to launch your bulkload process. The best way to explain is with an example. Say you've got a tsv file of user records (name,login,email,description) and you want to index all the fields. Assuming you're going to write to an index called ```users``` with objects of type ```user``` (elasticsearch will create this object automatically the first time you upload one). The workflow is as follows: + +* Create the ```users``` index: + +``` +bin/estool create --index users +``` + +* Upload the data + +``` +# Will only work if the hadoop elasticsearch processes can discover the running elasticsearch cluster +bin/wonderdog --rm --index_name=users --bulk_size=4096 --object_type=user --field_names=name,login,email,description --id_field=1 /hdfs/path/to/users.tsv /tmp/failed_records/users +``` + +Notice the output path. When the bulk indexing job runs it is possible for index requests to fail for various reasons (too much load, etc). In this case the documents that failed are simply written to the hdfs so they can be retried in a later job. + +* Refresh Index + +After the bulk load is finished you'll want to refresh the index so your documents will actually be searchable: + +``` +bin/estool refresh --index users +``` + +* Snapshot Index + +You'll definitely want to do this after the bulk load finishes so you don't lose any data in case of cluster failure: + +``` +bin/estool snapshot --index users +``` + +* Bump the replicas for the index up to at least one. + +``` +bin/estool set_replication --index users --replicas=1 +``` + +This will take a while to finish and the cluster health will show yellow until it does. + +* Optimize the index + +``` +bin/estool optimize --index users -s 3 +``` + +This will also take a while to finish. + +#### TSV loader command-line options + +* ```index_name``` - Index to write data to. It does not have to exist ahead of time +* ```object_type``` - Type of object to index. The mapping for this object does not have to exist ahead of time. Fields will be updated dynamically by elasticsearch. +* ```field_names``` - A comma separated list of field names describing the tsv record input +* ```id_field``` - Index of field to use as object id (counting from 0; default 1), use -1 if there is no id field +* ```bulk_size``` - Number of records per bulk request sent to elasticsearch cluster +* ```es_home``` - Path to elasticsearch installation, read from the ES_HOME environment variable if it's set +* ```es_config``` - Path to elasticsearch config file (@elasticsearch.yml@) +* ```rm``` - Remove existing output? (true or leave blank) +* ```hadoop_home``` - Path to hadoop installation, read from the HADOOP_HOME environment variable if it's set +* ```min_split_size``` - Min split size for maps + +## Admin + +There are a number of convenience commands in ```bin/estool```. Most of the common rest api operations have be mapped. Enumerating a few: + +* Print status of all indices as a json hash to the terminal + +``` +# See everything (tmi) +bin/estool -c status +``` + +* Check cluster health (red,green,yellow,relocated shards, etc) + +``` +bin/estool -c health +``` + +* Set replicas for an index + +``` +bin/estool set_replication -c --index --replicas +``` + +* Optimize an index + +``` +bin/estool optimize -c --index +``` + +* Snapshot an index + +``` +bin/estool snapshot -c --index +``` + +* Delete an index + +``` +bin/estool delete -c --index +``` + + +## Bulk Loading Tips for the Risk-seeking Dangermouse + +The file examples/bulkload_pageviews.pig shows an example of bulk loading elasticsearch, including preparing the index. + +### Elasticsearch Setup + +Some tips for an industrial-strength cluster, assuming exclusive use of machines and no read load during the job: + +* use multiple machines with a fair bit of ram (7+GB). Heap doesn't help too much for loading though, so you don't have to go nuts: we do fine with amazon m1.large's. +* Allocate a sizeable heap, setting min and max equal, and + - turn `bootstrap.mlockall` on, and run `ulimit -l unlimited`. + - For example, for a 3GB heap: `-Xmx3000m -Xms3000m -Delasticsearch.bootstrap.mlockall=true` + - Never use a heap above 12GB or so, it's dangerous (STW compaction timeouts). + - You've succeeded if the full heap size is resident on startup: that is, in htop both the VMEM and RSS are 3000 MB or so. +* temporarily increase the `index_buffer_size`, to say 40%. + +### Further reading + +* [Elasticsearch JVM Settings, explained](http://jprante.github.com/2012/11/28/Elasticsearch-Java-Virtual-Machine-settings-explained.html) + +### Example of creating an index and mapping + +Index: + + curl -XPUT ''http://localhost:9200/pageviews' -d '{"settings": { + "index": { "number_of_shards": 12, "store.compress": { "stored": true, "tv": true } } }}' + + $ curl -XPUT 'http://localhost:9200/ufo_sightings/_settings?pretty=true' -d '{"settings": { + "index": { "number_of_shards": 12, "store.compress": { "stored": true, "tv": true } } }}' + +Mapping (elasticsearch "type"): + + # Wikipedia Pageviews + curl -XPUT ''http://localhost:9200/pageviews/pagehour/_mapping' -d '{ + "pagehour": { "_source": { "enabled" : true }, "properties" : { + "page_id" : { "type": "long", "store": "yes" }, + "namespace": { "type": "integer", "store": "yes" }, + "title": { "type": "string", "store": "yes" }, + "num_visitors": { "type": "long", "store": "yes" }, + "date": { "type": "integer", "store": "yes" }, + "time": { "type": "long", "store": "yes" }, + "ts": { "type": "date", "store": "yes" }, + "day_of_week": { "type": "integer", "store": "yes" } } }}' + + $ curl -XPUT 'http://localhost:9200/ufo_sightings/sighting/_mapping' -d '{ "sighting": { + "_source": { "enabled" : true }, + "properties" : { + "sighted_at": { "type": "date", "store": "yes" }, + "reported_at": { "type": "date", "store": "yes" }, + "shape": { "type": "string", "store": "yes" }, + "duration": { "type": "string", "store": "yes" }, + "description": { "type": "string", "store": "yes" }, + "coordinates": { "type": "geo_point", "store": "yes" }, + "location_str": { "type": "string", "store": "no" }, + "location": { "type": "object", "dynamic": false, "properties": { + "place_id": { "type": "string", "store": "yes" }, + "place_type": { "type": "string", "store": "yes" }, + "city": { "type": "string", "store": "yes" }, + "county": { "type": "string", "store": "yes" }, + "state": { "type": "string", "store": "yes" }, + "country": { "type": "string", "store": "yes" } } } + } } }' + + +### Temporary Bulk-load settings for an index + +To prepare a database for bulk loading, the following settings may help. They are +*EXTREMELY* aggressive, and include knocking the replication factor back to 1 (zero replicas). One +false step and you've destroyed Tokyo. + +Actually, you know what? Never mind. Don't apply these, they're too crazy. + + curl -XPUT 'http://localhost:9200/pageviews/_settings?pretty=true' -d '{"index": { + "number_of_replicas": 0, "refresh_interval": -1, "gateway.snapshot_interval": -1, + "translog": { "flush_threshold_ops": 50000, "flush_threshold_size": "200mb", "flush_threshold_period": "300s" }, + "merge.policy": { "max_merge_at_once": 30, "segments_per_tier": 30, "floor_segment": "10mb" }, + "store.compress": { "stored": true, "tv": true } } }' + +To restore your settings, in case you didn't destroy Tokyo: + + curl -XPUT 'http://localhost:9200/pageviews/_settings?pretty=true' -d ' {"index": { + "number_of_replicas": 2, "refresh_interval": "60s", "gateway.snapshot_interval": "3600s", + "translog": { "flush_threshold_ops": 5000, "flush_threshold_size": "200mb", "flush_threshold_period": "300s" }, + "merge.policy": { "max_merge_at_once": 10, "segments_per_tier": 10, "floor_segment": "10mb" }, + "store.compress": { "stored": true, "tv": true } } }' + +If you did destroy your database, please send your resume to jobs@infochimps.com as you begin your +job hunt. It's the reformed sinner that makes the best missionary. + + +### Post-bulkrun maintenance + + es_index=pageviews ; ( for foo in _flush _refresh '_optimize?max_num_segments=6&refresh=true&flush=true&wait_for_merge=true' '_gateway/snapshot' ; do echo "======= $foo" ; time curl -XPOST "http://localhost:9200/$es_index/$foo" ; done ) & + +### Full dump of cluster health + + es_index=pageviews ; es_node="projectes-elasticsearch-4" + curl -XGET "http://localhost:9200/$es_index/_status?pretty=true" + curl -XGET "http://localhost:9200/_cluster/state?pretty=true" + curl -XGET "http://localhost:9200/$es_index/_stats?pretty=true&merge=true&refresh=true&flush=true&warmer=true" + curl -XGET "http://localhost:9200/_cluster/nodes/$es_node/stats?pretty=true&all=true" + curl -XGET "http://localhost:9200/_cluster/nodes/$es_node?pretty=true&all=true" + curl -XGET "http://localhost:9200/_cluster/health?pretty=true" + curl -XGET "http://localhost:9200/$es_index/_search?pretty=true&limit=3" + curl -XGET "http://localhost:9200/$es_index/_segments?pretty=true" | head -n 200 + +### Decommission nodes + +Run this, excluding the decommissionable nodes from the list: + + curl -XPUT http://localhost:9200/pageviews/_settings -d '{ + "index.routing.allocation.include.ironfan_name" : + "projectes-elasticsearch-0,projectes-elasticsearch-1,projectes-elasticsearch-2" }' diff --git a/README.textile b/README.textile deleted file mode 100644 index 2ff2df2..0000000 --- a/README.textile +++ /dev/null @@ -1,225 +0,0 @@ -h1. Wonderdog - -Wonderdog is a Hadoop interface to Elastic Search. While it is specifically intended for use with Apache Pig, it does include all the necessary Hadoop input and output formats for Elastic Search. That is, it's possible to skip Pig entirely and write custom Hadoop jobs if you prefer. - -h2. Requirements - -h2. Usage - -h3. Using ElasticSearchStorage for Apache Pig - -The most up-to-date (and simplest) way to store data into elasticsearch with hadoop is to use the Pig Store Function. You can write both delimited and json data to elasticsearch as well as read data from elasticsearch. - -h4. Storing tabular data: - -This allows you to store tabular data (eg. tsv, csv) into elasticsearch. - -

-register 'target/wonderdog.jar';
-register '/usr/local/share/elasticsearch/lib/elasticsearch-0.16.0.jar';
-register '/usr/local/share/elasticsearch/lib/jline-0.9.94.jar';
-register '/usr/local/share/elasticsearch/lib/jna-3.2.7.jar';
-register '/usr/local/share/elasticsearch/lib/log4j-1.2.15.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-analyzers-3.1.0.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-core-3.1.0.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-highlighter-3.1.0.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-memory-3.1.0.jar';
-register '/usr/local/share/elasticsearch/lib/lucene-queries-3.1.0.jar';
-
-%default INDEX 'ufo_sightings'
-%default OBJ   'ufo_sighting'        
-
-ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray);
-STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=false&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage();
-
- -Here the fields that you set in Pig (eg. 'sighted_at') are used as the field names when creating json records for elasticsearch. - -h4. Storing json data: - -You can store json data just as easily. - -

-ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_record:chararray);
-STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=true&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage();
-
- -h4. Reading data: - -Easy too. - -

--- dump some of the ufo sightings index based on free text query
-alien_sightings = LOAD 'es://ufo_sightings/ufo_sightings?q=alien' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage() AS (doc_id:chararray, contents:chararray);
-DUMP alien_sightings;
-
- -h4. ElasticSearchStorage Constructor - -The constructor to the UDF can take two arguments (in the following order): - -* @esConfig@ - The full path to where elasticsearch.yml lives on the machine launching the hadoop job -* @esPlugins@ - The full path to where the elasticsearch plugins directory lives on the machine launching the hadoop job - -h4. Query Parameters - -There are a few query paramaters available: - -* @json@ - (STORE only) When 'true' indicates to the StoreFunc that pre-rendered json records are being indexed. Default is false. -* @size@ - When storing, this is used as the bulk request size (the number of records to stack up before indexing to elasticsearch). When loading, this is the number of records to fetch per request. Default 1000. -* @q@ - (LOAD only) A free text query determining which records to load. If empty, matches all documents in the index. -* @id@ - (STORE only) The name of the field to use as a document id. If blank (or -1) the documents are assumed to have no id and are assigned one by elasticsearch. -* @tasks@ - (LOAD only) The number of map tasks to launch. Default 100. - -Note that elasticsearch.yml and the plugins directory are distributed to every machine in the cluster automatically via hadoop's distributed cache mechanism. - -h3. Native Hadoop TSV Loader - -**Note**: the tsv loader is deprecated. Instead, use the ElasticSearchOutputFormat coupled with either Apache Pig storefunc (ElasticSearchIndex or ElasticSearchJsonIndex). - -Once you've got a working set up you should be ready to launch your bulkload process. The best way to explain is with an example. Say you've got a tsv file of user records (name,login,email,description) and you want to index all the fields. Assuming you're going to write to an index called @users@ with objects of type @user@ (elasticsearch will create this object automatically the first time you upload one). The workflow is as follows: - -* Create the @users@ index: - -

-bin/estool --host= --index_name=users create_index
-
- -* Upload the data - -

-# Will only work if the hadoop elasticsearch processes can discover the running elasticsearch cluster
-bin/wonderdog --rm --index_name=users --bulk_size=4096 --object_type=user --field_names=name,login,email,description --id_field=1 /hdfs/path/to/users.tsv /tmp/failed_records/users
-
- -Notice the output path. When the bulk indexing job runs it is possible for index requests to fail for various reasons (too much load, etc). In this case the documents that failed are simply written to the hdfs so they can be retried in a later job. - -* Refresh Index - -After the bulk load is finished you'll want to refresh the index so your documents will actually be searchable: - -

-bin/estool --host= --index_name=users refresh_index
-
- -* Snapshot Index - -You'll definitely want to do this after the bulk load finishes so you don't lose any data in case of cluster failure: - -

-bin/estool --host= --index_name=users snapshot_index
-
- - -* Bump the replicas for the index up to at least one. - -

-bin/estool --host= --index_name=users --replicas=1 set_replicas
-
- -This will take a while to finish and the cluster health will show yellow until it does. - -* Optimize the index - -

-bin/estool --host= --index_name=users optimize_index
-
- -This will also take a while to finish. - -h4. TSV loader command-line options - -* @index_name@ - Index to write data to. It does not have to exist ahead of time -* @object_type@ - Type of object to index. The mapping for this object does not have to exist ahead of time. Fields will be updated dynamically by elasticsearch. -* @field_names@ - A comma separated list of field names describing the tsv record input -* @id_field@ - Index of field to use as object id (counting from 0; default 1), use -1 if there is no id field -* @bulk_size@ - Number of records per bulk request sent to elasticsearch cluster -* @es_home@ - Path to elasticsearch installation, read from the ES_HOME environment variable if it's set -* @es_config@ - Path to elasticsearch config file (@elasticsearch.yml@) -* @rm@ - Remove existing output? (true or leave blank) -* @hadoop_home@ - Path to hadoop installation, read from the HADOOP_HOME environment variable if it's set -* @min_split_size@ - Min split size for maps - -h2. Admin - -There are a number of convenience commands in @bin/estool@. Enumerating a few: - -* Print status of all indices as a json hash to the terminal - -

-# See everything (tmi)
-bin/estool --host= status
-
-# Just see index level stuff on docs and replicas:
-bin/estool --host= status | grep -C10 number_of_replicas
-
- -* Check cluster health (red,green,yellow,relocated shards, etc) - -

-bin/estool --host= health
-
- -* Set replicas for an index - -

-bin/estool --host= --index_name= --replicas= set_replicas
-
- -* Optimize an index - -

-bin/estool --host= --index_name= optimize_index
-
- -* Snapshot an index - -

-bin/estool --host= --index_name= snapshot_index
-
- -* Delete an index - -You'd better be sure. - -

-bin/estool --host= --index_name= delete_index
-
- - -And so on. Most of the common rest api operations have be mapped into estool. - - - - - - - ------------------------------------ THIS NEEDS CLEANED UP AND TURNED INTO A COHERENT SECTION, RIGHT NOW IT IS INCOMPREHENSIBLE ---------------------------- -h3. How to choose shards, replicas and cluster size: Rules of Thumb. - -sh = shards -rf = replication factor. replicas = 0 implies rf = 1, or 1 replica of each shard. - -pm = running data_esnode processes per machine -N = number of machines - -n_cores = number of cpu cores per machine -n_disks = number of disks per machine - - - -* You must have at least as many data_esnodes as - Mandatory: (sh * rf) < (pm * N) - - - Shards: shard size < 10GB - - -More shards = more parallel writes - - - -curl -XPUT "`cat /etc/motd | grep IP | cuttab 2`:9200/tweet-2010q1/_settings" -d '{ "index":{ "number_of_replicas":1 } }' ------------------------------------------------------------------------------------------------------------------------------------------------------------ - diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..705c2ba --- /dev/null +++ b/Rakefile @@ -0,0 +1,10 @@ +require 'bundler' +Bundler::GemHelper.install_tasks + +require 'rspec/core/rake_task' +RSpec::Core::RakeTask.new(:specs) + +require 'yard' +YARD::Rake::YardocTask.new + +task :default => [:specs] diff --git a/examples/bulkload_wp_pageviews.pig b/examples/bulkload_wp_pageviews.pig new file mode 100644 index 0000000..adcc5b2 --- /dev/null +++ b/examples/bulkload_wp_pageviews.pig @@ -0,0 +1,70 @@ +SET mapred.map.tasks.speculative.execution false; + +-- path to wikipedia pageviews data +%default PAGEVIEWS 's3n://bigdata.chimpy.us/data/results/wikipedia/full/pageviews/2008/03' +-- the target elasticsearch index and mapping ("type"). Will be created, though you +-- should do it yourself first instead as shown below. +%default INDEX 'pageviews' +%default OBJ 'pagehour' +-- path to elasticsearch jars +%default ES_JAR_DIR '/usr/local/share/elasticsearch/lib' +-- Batch size for loading +%default BATCHSIZE '10000' + +-- Example of bulk loading. This will easily load more than a billion documents +-- into a large cluster. We recommend using Ironfan to set your junk up. +-- +-- Preparation: +-- +-- Create the index: +-- +-- curl -XPUT 'http://projectes-elasticsearch-0.test.chimpy.us:9200/pageviews' -d '{"settings": { "index": { +-- "number_of_shards": 12, "number_of_replicas": 0, "store.compress": { "stored": true, "tv": true } } }}' +-- +-- Define the elasticsearch mapping (type): +-- +-- curl -XPUT 'http://projectes-elasticsearch-0.test.chimpy.us:9200/pageviews/pagehour/_mapping' -d '{ +-- "pagehour": { +-- "_source": { "enabled" : true }, +-- "properties" : { +-- "page_id" : { "type": "long", "store": "yes" }, +-- "namespace": { "type": "integer", "store": "yes" }, +-- "title": { "type": "string", "store": "yes" }, +-- "num_visitors": { "type": "long", "store": "yes" }, +-- "date": { "type": "integer", "store": "yes" }, +-- "time": { "type": "long", "store": "yes" }, +-- "ts": { "type": "date", "store": "yes" }, +-- "day_of_week": { "type": "integer", "store": "yes" } } }}' +-- +-- For best results, see the 'Tips for Bulk Loading' in the README. +-- + +-- Always disable speculative execution when loading into a database +set mapred.map.tasks.speculative.execution false +-- Don't re-use JVM: logging gets angry +set mapred.job.reuse.jvm.num.tasks 1 +-- Use large file sizes; setup/teardown time for leaving the cluster is worse +-- than non-local map tasks +set mapred.min.split.size 3000MB +set pig.maxCombinedSplitSize 2000MB +set pig.splitCombination true + +register ./target/wonderdog*.jar; +register $ES_JAR_DIR/*.jar; + +pageviews = LOAD '$PAGEVIEWS' AS ( + page_id:long, namespace:int, title:chararray, + num_visitors:long, date:int, time:long, + epoch_time:long, day_of_week:int); +pageviews_fixed = FOREACH pageviews GENERATE + page_id, namespace, title, + num_visitors, date, time, + epoch_time * 1000L AS ts, day_of_week; + +STORE pageviews_fixed INTO 'es://$INDEX/$OBJ?json=false&size=$BATCHSIZE' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); + +-- -- To instead dump the JSON data to disk (needs Pig 0.10+) +-- set dfs.replication 2 +-- %default OUTDUMP '$PAGEVIEWS.json' +-- rmf $OUTDUMP +-- STORE pageviews_fixed INTO '$OUTDUMP' USING JsonStorage(); diff --git a/examples/no_wonderdog.rb b/examples/no_wonderdog.rb new file mode 100644 index 0000000..060a8a0 --- /dev/null +++ b/examples/no_wonderdog.rb @@ -0,0 +1,2 @@ +Wukong.dataflow(:mapper) { identity } +Wukong.dataflow(:reducer) { identity } diff --git a/examples/wonderdog.rb b/examples/wonderdog.rb new file mode 100644 index 0000000..7503345 --- /dev/null +++ b/examples/wonderdog.rb @@ -0,0 +1,4 @@ +require 'wonderdog' +Wukong.dataflow(:mapper) { identity } +Wukong.dataflow(:reducer) { identity } + diff --git a/lib/wonderdog.rb b/lib/wonderdog.rb new file mode 100644 index 0000000..c50ef4c --- /dev/null +++ b/lib/wonderdog.rb @@ -0,0 +1,42 @@ +require 'wukong-hadoop' + +module Wukong + + # Wonderdog provides Java code that couples Hadoop streaming to + # Wukong. This module adds some overrides which enables the + # wu-hadoop program to leverage this code. + module Elasticsearch + include Plugin + + # Configure the given `settings` to be able to work with + # Elasticsearch. + # + # @param [Configliere::Param] settings + # @return [Configliere::Param] the newly configured settings + def self.configure settings, program + return unless program == 'wu-hadoop' + settings.define(:es_tmp_dir, :description => "Temporary directory on the HDFS to store job files while reading/writing to ElasticSearch", :default => "/user/#{ENV['USER']}/wukong", :wukong_hadoop => true) + settings.define(:es_lib_dir, :description => "Directory containing Elasticsearch, Wonderdog, and other support jars", :default => "/usr/lib/hadoop/lib", :wukong_hadoop => true) + settings.define(:es_config, :description => "Where to find configuration files detailing how to join an ElasticSearch cluster", :wukong_hadoop => true) + settings.define(:es_input_splits, :description => "Number of input splits to target when reading from ElasticSearch", :type => Integer, :wukong_hadoop => true) + settings.define(:es_request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :wukong_hadoop => true) + settings.define(:es_scroll_timeout, :description => "Amount of time to wait on a scroll", :wukong_hadoop => true) + settings.define(:es_index_field, :description => "Field to use from each record to override the default index", :wukong_hadoop => true) + settings.define(:es_mapping_field, :description => "Field to use from each record to override the default mapping", :wukong_hadoop => true) + settings.define(:es_id_field, :description => "If this field is present in a record, make an update request, otherwise make a create request", :wukong_hadoop => true) + settings.define(:es_bulk_size, :description => "Number of requests to batch locally before making a request to ElasticSearch", :type => Integer, :wukong_hadoop => true) + settings.define(:es_query, :description => "Query to use when defining input splits for ElasticSearch input", :wukong_hadoop => true) + end + + # Boot Wonderdog with the given `settings` in the given `dir`. + # + # @param [Configliere::Param] settings + # @param [String] root + def self.boot settings, root + end + + end +end + +require 'wonderdog/hadoop_invocation_override' +require 'wonderdog/timestamp' diff --git a/lib/wonderdog/hadoop_invocation_override.rb b/lib/wonderdog/hadoop_invocation_override.rb new file mode 100644 index 0000000..6fcccc1 --- /dev/null +++ b/lib/wonderdog/hadoop_invocation_override.rb @@ -0,0 +1,168 @@ +require_relative("index_and_mapping") + +module Wukong + module Elasticsearch + + # This module overrides some methods defined in + # Wukong::Hadoop::HadoopInvocation. The overrides will only come + # into play if the job's input or output paths are URIs beginning + # with 'es://', implying reading or writing to/from Elasticsearch + # indices. + module HadoopInvocationOverride + + # The input format when reading from Elasticsearch as defined in + # the Java code accompanying Wonderdog. + # + # @param [String] + ES_STREAMING_INPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingInputFormat" + + # The output format when writing to Elasticsearch as defined in + # the Java code accompanying Wonderdog. + # + # @param [String] + ES_STREAMING_OUTPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingOutputFormat" + + # Does this job read from Elasticsearch? + # + # @return [true, false] + def reads_from_elasticsearch? + IndexAndMapping.matches?(settings[:input]) + end + + # The input format to use for this job. + # + # Will override the default value to ES_STREAMING_INPUT_FORMAT if + # reading from Elasticsearch. + # + # @return [String] + def input_format + reads_from_elasticsearch? ? ES_STREAMING_INPUT_FORMAT : super() + end + + # The input index to use. + # + # @return [IndexAndMapping] + def input_index + @input_index ||= IndexAndMapping.new(settings[:input]) + end + + # The input paths to use for this job. + # + # Will override the default value with a temporary HDFS path + # when reading from Elasticsearch. + # + # @return [String] + def input_paths + reads_from_elasticsearch? ? elasticsearch_hdfs_tmp_dir(input_index) : super() + end + + # Does this write to Elasticsearch? + # + # @return [true, false] + def writes_to_elasticsearch? + IndexAndMapping.matches?(settings[:output]) + end + + # The output format to use for this job. + # + # Will override the default value to ES_STREAMING_OUTPUT_FORMAT if + # writing to Elasticsearch. + # + # @return [String] + def output_format + writes_to_elasticsearch? ? ES_STREAMING_OUTPUT_FORMAT : super() + end + + # The output index to use. + # + # @return [IndexAndMapping] + def output_index + @output_index ||= IndexAndMapping.new(settings[:output]) + end + + # The output path to use for this job. + # + # Will override the default value with a temporary HDFS path + # when writing to Elasticsearch. + # + # @return [String] + def output_path + writes_to_elasticsearch? ? elasticsearch_hdfs_tmp_dir(output_index) : super() + end + + # Adds Java options required to interact with the input/output + # formats defined by the Java code accompanying Wonderdog. + # + # Will not change the default Hadoop jobconf options unless it + # has to. + # + # @return [Array] + def hadoop_jobconf_options + if reads_from_elasticsearch? || writes_to_elasticsearch? + settings[:map_speculative] = 'false' if settings[:map_speculative].nil? + settings[:reduce_speculative] = 'false' if settings[:reduce_speculative].nil? + end + + super() + [].tap do |o| + if (reads_from_elasticsearch? || writes_to_elasticsearch?) + o << java_opt('es.config', settings[:es_config]) + end + + if reads_from_elasticsearch? + o << java_opt('elasticsearch.input.index', input_index.index) + o << java_opt('elasticsearch.input.mapping', input_index.mapping) + o << java_opt('elasticsearch.input.splits', settings[:es_input_splits]) + o << java_opt('elasticsearch.input.query', settings[:es_query]) + o << java_opt('elasticsearch.input.request_size', settings[:es_request_size]) + o << java_opt('elasticsearch.input.scroll_timeout', settings[:es_scroll_timeout]) + end + + if writes_to_elasticsearch? + o << java_opt('elasticsearch.output.index', output_index.index) + o << java_opt('elasticsearch.output.mapping', output_index.mapping) + o << java_opt('elasticsearch.output.index.field', settings[:es_index_field]) + o << java_opt('elasticsearch.output.mapping.field', settings[:es_mapping_field]) + o << java_opt('elasticsearch.output.id.field', settings[:es_id_field]) + o << java_opt('elasticsearch.output.bulk_size', settings[:es_bulk_size]) + end + end.flatten.compact + end + + # :nodoc: + # + # Munge the settings object to add necessary jars if + # reading/writing to/from Elasticsearch, then call super(). + def hadoop_files + if reads_from_elasticsearch? || writes_to_elasticsearch? + settings[:jars] = elasticsearch_jars if settings[:jars].empty? + end + super() + end + + # All Elasticsearch, Wonderdog, and other support jars needed to + # connect Hadoop streaming with the + # ElasticSearchStreamingInputFormat and + # ElasticSearchStreamingOutputFormat provided by the Wonderdog + # Java code. + # + # @return [Array] + def elasticsearch_jars + Dir[File.join(settings[:es_lib_dir] || '/usr/lib/hadoop/lib', '{elasticsearch,lucene,jna,wonderdog}*.jar')].compact.uniq + end + + # Returns a temporary path on the HDFS in which to store log + # data while the Hadoop job runs. + # + # @param [IndexAndMapping] io + # @return [String] + def elasticsearch_hdfs_tmp_dir io + cleaner = %r{[^\w/\.\-\+]+} + io_part = [io.index, io.mapping].compact.map { |s| s.gsub(cleaner, '') }.join('/') + File.join(settings[:es_tmp_dir] || '/', io_part || '', Time.now.strftime("%Y-%m-%d-%H-%M-%S")) + end + + end + end + + Hadoop::HadoopRunner.class_eval { include Elasticsearch::HadoopInvocationOverride } +end diff --git a/lib/wonderdog/index_and_mapping.rb b/lib/wonderdog/index_and_mapping.rb new file mode 100644 index 0000000..e05a368 --- /dev/null +++ b/lib/wonderdog/index_and_mapping.rb @@ -0,0 +1,67 @@ +module Wukong + module Elasticsearch + + # A convenient class for parsing Elasticsearch index and mapping URIs + # like + # + # - es://my_index + # - es://my_index/my_mapping + # - es://first_index,second_index,third_index + # - es://my_index/first_mapping,second_mapping,third_mapping + class IndexAndMapping + + # A regular expression that matches URIs describing an + # Elasticsearch index and/or mapping to read/write from/to. + # + # @param [Regexp] + ES_SCHEME_REGEXP = %r{^es://} + + # The Elasticsearch index. + # + # @param [String] + attr_reader :index + + # The Elasticsearch mapping. + # + # @param [String] + attr_reader :mapping + + # Does the given `string` look like a possible Elasticsearch + # /index/mapping specification? + # + # @param [String] string + # @return [true, false] + def self.matches? string + return false unless string + string =~ ES_SCHEME_REGEXP + end + + # Create a new index and mapping specification from the given + # +uri.. + # + # @param [String] uri + def initialize uri + self.uri = uri + end + + # Set the URI of this index and mapping specification, parsing it + # for an index and mapping. + # + # Will raise an error if the given URI is malformed. + # + # @param [String] uri + def uri= uri + raise Wukong::Error.new("'#{uri}' is not an ElasticSearch es://index/mapping specification") unless self.class.matches?(uri) + parts = uri.gsub(ES_SCHEME_REGEXP, '').gsub(/^\/+/,'').gsub(/\/+$/,'').split('/') + + raise Wukong::Error.new("'#{uri}' is not an ElasticSearch es://index/mapping specification") unless parts.size.between?(1,2) + + @index = parts[0] + @mapping = parts[1] + end + end + end +end + + + diff --git a/lib/wonderdog/timestamp.rb b/lib/wonderdog/timestamp.rb new file mode 100644 index 0000000..d24b899 --- /dev/null +++ b/lib/wonderdog/timestamp.rb @@ -0,0 +1,43 @@ +module Wukong + module Elasticsearch + + # A class that makes Ruby's Time class serialize the way + # Elasticsearch expects. + # + # Elasticsearch's date parsing engine [expects to + # receive](http://www.elasticsearch.org/guide/reference/mapping/date-format.html) + # a date formatted according to the Java library + # [Joda's](http://joda-time.sourceforge.net/) + # [ISODateTimeFormat.dateOptionalTimeParser](http://joda-time.sourceforge.net/api-release/org/joda/time/format/ISODateTimeFormat.html#dateOptionalTimeParser()) + # class. + # + # This format looks like this: `2012-11-30T01:15:23`. + # + # @see http://www.elasticsearch.org/guide/reference/mapping/date-format.html The Elasticsearch guide's Date Format entry + # @see http://joda-time.sourceforge.net/api-release/org/joda/time/format/ISODateTimeFormat.html#dateOptionalTimeParser() The Joda class's API documentation + class Timestamp < Time + + # Parses the given `string` into a Timestamp instance. + # + # @param [String] string + # @return [Timestamp] + def self.receive string + return if string.nil? || string.empty? + begin + t = Time.parse(string) + rescue ArgumentError => e + return + end + new(t.year, t.month, t.day, t.hour, t.min, t.sec, t.utc_offset) + end + + # Formats the Timestamp according to ISO 8601 rules. + # + # @param [Hash] options + # @return [String] + def to_wire(options={}) + utc.iso8601 + end + end + end +end diff --git a/lib/wonderdog/version.rb b/lib/wonderdog/version.rb new file mode 100644 index 0000000..34a5b1a --- /dev/null +++ b/lib/wonderdog/version.rb @@ -0,0 +1,3 @@ +module Wonderdog + VERSION = '0.1.1' +end diff --git a/notes/cluster_notes.md b/notes/cluster_notes.md new file mode 100644 index 0000000..b3961f7 --- /dev/null +++ b/notes/cluster_notes.md @@ -0,0 +1,17 @@ +### How to choose shards, replicas and cluster size: Rules of Thumb. + +sh = shards +rf = replication factor. replicas = 0 implies rf = 1, or 1 replica of each shard. + +pm = running data_esnode processes per machine +N = number of machines + +n_cores = number of cpu cores per machine +n_disks = number of disks per machine + +* You must have at least as many data_esnodes as + Mandatory: (sh * rf) < (pm * N) + + Shards: shard size < 10GB + +More shards = more parallel writes \ No newline at end of file diff --git a/pom.xml b/pom.xml index 412ed42..63384a8 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ org.elasticsearch elasticsearch - 0.16.0 + 0.19.8 @@ -30,7 +30,7 @@ org.apache.pig pig - 0.8.0 + 0.11.0 diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..58e42e3 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,29 @@ +require 'wonderdog' +require 'wukong/spec_helpers' + +RSpec.configure do |config| + + config.before(:each) do + Wukong::Log.level = Log4r::OFF + @orig_reg = Wukong.registry.show + end + + config.after(:each) do + Wukong.registry.clear! + Wukong.registry.merge!(@orig_reg) + end + + include Wukong::SpecHelpers + + def root + @root ||= Pathname.new(File.expand_path('../..', __FILE__)) + end + + def hadoop_runner *args, &block + runner(Wukong::Hadoop::HadoopRunner, 'wu-hadoop', *args) do + stub!(:execute_command!) + instance_eval(&block) if block_given? + end + end + +end diff --git a/spec/support/.gitkeep b/spec/support/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/spec/wonderdog/hadoop_invocation_override_spec.rb b/spec/wonderdog/hadoop_invocation_override_spec.rb new file mode 100644 index 0000000..28ee566 --- /dev/null +++ b/spec/wonderdog/hadoop_invocation_override_spec.rb @@ -0,0 +1,142 @@ +require 'spec_helper' + +describe Wukong::Elasticsearch::HadoopInvocationOverride do + + let(:no_es) { hadoop_runner('regexp', 'count', input: '/tmp/input_file', output: '/tmp/output_file') } + let(:es_reader) { hadoop_runner('regexp', 'count', input: 'es://the_index/the_map', output: '/tmp/output_file') } + let(:es_writer) { hadoop_runner('regexp', 'count', input: '/tmp/input_file', output: 'es:///the_index/the_map') } + let(:es_complex) { hadoop_runner('regexp', 'count', input: 'es://the_index/the_map', output: 'es:///the_index/the_map', es_query: '{"hi": "there"}', es_request_size: 1000, es_index_field: 'ID', map_speculative: true, reduce_speculative: true) } + + context "passing necessary jars to Hadoop streaming" do + before { Dir.stub!(:[]).and_return(["/lib/dir/elasticsearch.jar"], ["/lib/dir/wonderdog.jar"]) } + context "when not given explicit jars" do + context "and not interacting with Elasticsearch" do + it "doesn't add jars" do + no_es.hadoop_commandline.should_not match('-libjars') + end + end + context "and reading from Elasticsearch" do + it "adds default jars it finds on the local filesystem" do + es_reader.hadoop_commandline.should match('-libjars.*elasticsearch') + end + end + context "and writing to Elasticsearch" do + it "adds default jars it finds on the local filesystem" do + es_writer.hadoop_commandline.should match('-libjars.*elasticsearch') + end + end + context "and reading and writing to Elasticsearch" do + it "adds default jars it finds on the local filesystem" do + es_complex.hadoop_commandline.should match('-libjars.*elasticsearch') + end + end + end + end + + context "setting speculative execution" do + context "when not given speculative options" do + context "and not interacting with Elasticsearch" do + it "doesn't add any speculative options" do + no_es.hadoop_commandline.should_not match('speculative') + end + end + context "and reading from Elasticsearch" do + it "disables speculative execution in the mapper" do + es_reader.hadoop_commandline.should match(/-D mapred.map.tasks.speculative.execution.*false/) + end + it "disables speculative execution in the reducer" do + es_reader.hadoop_commandline.should match(/-D mapred.reduce.tasks.speculative.execution.*false/) + end + end + context "and reading from Elasticsearch" do + it "disables speculative execution in the mapper" do + es_writer.hadoop_commandline.should match(/-D mapred.map.tasks.speculative.execution.*false/) + end + it "disables speculative execution in the reducer" do + es_writer.hadoop_commandline.should match(/-D mapred.reduce.tasks.speculative.execution.*false/) + end + end + end + context "when given speculative options" do + it "does not change them" do + es_complex.hadoop_commandline.should match(/-D mapred.map.tasks.speculative.execution.*true/) + es_complex.hadoop_commandline.should match(/-D mapred.reduce.tasks.speculative.execution.*true/) + end + end + end + + context "handling input and output paths, formats, and options when" do + + context "not interacting with Elasticsearch" do + subject { no_es } + # input + its(:input_paths) { should == '/tmp/input_file' } + its(:hadoop_commandline) { should match(%r{-input.*/tmp/input_file}i) } + + # output + its(:output_path) { should == '/tmp/output_file' } + its(:hadoop_commandline) { should match(%r{-output.*/tmp/output_file}i) } + + # no elasticsearch anything + its(:hadoop_commandline) { should_not match(/elasticsearch/i) } + end + + context "reading from Elasticsearch" do + subject { es_reader } + + # input + its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_map}) } + its(:hadoop_commandline) { should match(/-inputformat.*elasticsearch/i) } + its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_map}i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.index.*the_index/i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.map.*the_map/i) } + + # output + its(:output_path) { should == '/tmp/output_file' } + its(:hadoop_commandline) { should_not match(/-outputformat/i) } + its(:hadoop_commandline) { should match(%r{-output.*/tmp/output_file}i) } + its(:hadoop_commandline) { should_not match(/-D\s+elasticsearch\.output/i) } + end + + context "writing to Elasticsearch" do + subject { es_writer } + + # input + its(:input_paths) { should == '/tmp/input_file' } + its(:hadoop_commandline) { should_not match(/-inputformat/i) } + its(:hadoop_commandline) { should match(%r{-input.*/tmp/input_file}i) } + its(:hadoop_commandline) { should_not match(/-D\s+elasticsearch\.input/i) } + + # output + its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_map}) } + its(:hadoop_commandline) { should match(/-outputformat.*elasticsearch/i) } + its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_map}i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index.*the_index/i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.map.*the_map/i) } + end + + context "reading and writing with many options" do + subject { es_complex } + + # input + its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_map}) } + its(:hadoop_commandline) { should match(/-inputformat.*elasticsearch/i) } + its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_map}i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.index.*the_index/i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.map.*the_map/i) } + + # output + its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_map}) } + its(:hadoop_commandline) { should match(/-outputformat.*elasticsearch/i) } + its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_map}i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index.*the_index/i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.map.*the_map/i) } + + # options + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.query.*hi.*there/i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.request_size.*1000/i) } + its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index\.field.*ID/i) } + end + end + +end diff --git a/spec/wonderdog/index_and_type_spec.rb b/spec/wonderdog/index_and_type_spec.rb new file mode 100644 index 0000000..731d082 --- /dev/null +++ b/spec/wonderdog/index_and_type_spec.rb @@ -0,0 +1,73 @@ +require 'spec_helper' + +describe Wukong::Elasticsearch::IndexAndMapping do + + subject { Wukong::Elasticsearch::IndexAndMapping } + + let(:filesystem_path) { '/some/path' } + let(:filesystem_paths) { '/some/path,/some/other/path' } + + let(:hdfs_path) { 'hdfs://some/hdfs/path' } + let(:hdfs_paths) { 'hdfs://some/hdfs/path,hdfs://some/other/hdfs/path' } + + let(:es_index_and_mapping) { 'es://index/mapping' } + let(:es_indices_and_mapping) { 'es://index1,index2/mapping' } + let(:es_index_and_mappings) { 'es://index/mapping1,mapping2' } + let(:es_indices_and_mappings) { 'es://index1,index2/mapping1,mapping2' } + + fails = %w[filesystem_path filesystem_paths hdfs_path hdfs_paths] + passes = %w[es_index_and_mapping es_indices_and_mapping es_index_and_mappings es_indices_and_mappings] + + context 'recognizing possible es://index/mapping specifications' do + fails.each do |name| + it "doesn't recognize a #{name}" do + subject.matches?(self.send(name)).should be_false + end + end + passes.each do |name| + it "recognizes a #{name}" do + subject.matches?(self.send(name)).should be_true + end + end + end + + context "parsing es://index/mapping specifications" do + fails.each do |name| + it "raises an error on a #{name}" do + lambda { subject.new(self.send(name)) }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/mapping/i) + end + end + + it "raises an error on a specification with too many parts" do + lambda { subject.new('es://index/mapping/extra') }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/mapping/i) + end + + it "raises an error on a specification with too few parts" do + lambda { subject.new('es://') }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/mapping/i) + end + + context "on an index and mapping" do + subject { Wukong::Elasticsearch::IndexAndMapping.new(es_index_and_mapping) } + its(:index) { should == 'index' } + its(:mapping) { should == 'mapping' } + end + context "on indices and a mapping" do + subject { Wukong::Elasticsearch::IndexAndMapping.new(es_indices_and_mapping) } + its(:index) { should == 'index1,index2' } + its(:mapping) { should == 'mapping' } + end + context "on an index and mappings" do + subject { Wukong::Elasticsearch::IndexAndMapping.new(es_index_and_mappings) } + its(:index) { should == 'index' } + its(:mapping) { should == 'mapping1,mapping2' } + end + context "on indices and mappings" do + subject { Wukong::Elasticsearch::IndexAndMapping.new(es_indices_and_mappings) } + its(:index) { should == 'index1,index2' } + its(:mapping) { should == 'mapping1,mapping2' } + end + + + end + +end diff --git a/spec/wonderdog/wu-hadoop_spec.rb b/spec/wonderdog/wu-hadoop_spec.rb new file mode 100644 index 0000000..5bed127 --- /dev/null +++ b/spec/wonderdog/wu-hadoop_spec.rb @@ -0,0 +1,18 @@ +require 'spec_helper' + +describe 'wu-hadoop' do + + context "when wonderdog hasn't been required" do + let(:script) { examples_dir('no_wonderdog.rb') } + it "doesn't recognize Elasticsearch URIs" do + command('wu-hadoop', script, '--input=es://foo/bar', '--output=/some/path', '--dry_run').should_not have_stdout('elasticsearch') + end + end + + context "when wonderdog hasn't been required" do + let(:script) { examples_dir('wonderdog.rb') } + it "recognizes Elasticsearch URIs" do + command('wu-hadoop', script, '--input=es://foo/bar', '--output=/some/path', '--dry_run').should have_stdout('elasticsearch') + end + end +end diff --git a/spec/wonderdog_spec.rb b/spec/wonderdog_spec.rb new file mode 100644 index 0000000..3af1f59 --- /dev/null +++ b/spec/wonderdog_spec.rb @@ -0,0 +1,5 @@ +require 'spec_helper' + +describe Wukong::Elasticsearch do + it_behaves_like 'a plugin' +end diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchInputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchInputFormat.java index 6806281..64e585a 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticSearchInputFormat.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchInputFormat.java @@ -27,8 +27,8 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.index.query.xcontent.FilterBuilders.*; -import org.elasticsearch.index.query.xcontent.QueryBuilders; +import org.elasticsearch.index.query.FilterBuilders.*; +import org.elasticsearch.index.query.QueryBuilders; /** diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java index 10baf47..908ecfb 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java @@ -32,7 +32,7 @@ import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.elasticsearch.client.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.ExceptionsHelper; @@ -40,13 +40,13 @@ import com.infochimps.elasticsearch.hadoop.util.HadoopUtils; /** - + Hadoop OutputFormat for writing arbitrary MapWritables (essentially HashMaps) into Elasticsearch. Records are batched up and sent in a one-hop manner to the elastic search data nodes that will index them. - + */ public class ElasticSearchOutputFormat extends OutputFormat implements Configurable { - + static Log LOG = LogFactory.getLog(ElasticSearchOutputFormat.class); private Configuration conf = null; @@ -60,12 +60,13 @@ protected class ElasticSearchRecordWriter extends RecordWriterelasticsearch.id.field.name - When elasticsearch.is_json is true, this is the name of a field in the json document that contains the document's id. If -1 is used then the document is assumed to have no id and one is assigned to it by elasticsearch.
  • elasticsearch.field.names - When elasticsearch.is_json is false, this is a comma separated list of field names.
  • elasticsearch.id.field - When elasticsearch.is_json is false, this is the numeric index of the field to use as the document id. If -1 is used the document is assumed to have no id and one is assigned to it by elasticsearch.
  • - + */ public ElasticSearchRecordWriter(TaskAttemptContext context) { Configuration conf = context.getConfiguration(); @@ -118,9 +119,10 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { LOG.info("Using field:["+idFieldName+"] for document ids"); } this.objType = conf.get(ES_OBJECT_TYPE); - + // - // Fetches elasticsearch.yml and the plugins directory from the distributed cache + // Fetches elasticsearch.yml and the plugins directory from the distributed cache, or + // from the local config. // try { String taskConfigPath = HadoopUtils.fetchFileFromCache(ES_CONFIG_NAME, conf); @@ -130,9 +132,10 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { System.setProperty(ES_CONFIG, taskConfigPath); System.setProperty(ES_PLUGINS, taskPluginsPath+SLASH+ES_PLUGINS_NAME); } catch (Exception e) { - throw new RuntimeException(e); + System.setProperty(ES_CONFIG,conf.get(ES_CONFIG)); + System.setProperty(ES_PLUGINS,conf.get(ES_PLUGINS)); } - + start_embedded_client(); initialize_index(indexName); currentRequest = client.prepareBulk(); @@ -142,7 +145,7 @@ public ElasticSearchRecordWriter(TaskAttemptContext context) { Closes the connection to elasticsearch. Any documents remaining in the bulkRequest object are indexed. */ public void close(TaskAttemptContext context) throws IOException { - if (currentRequest.numberOfActions() > 0) { + if (currentRequest.numberOfActions() > 0) { try { BulkResponse response = currentRequest.execute().actionGet(); } catch (Exception e) { @@ -173,7 +176,7 @@ public void write(NullWritable key, MapWritable fields) throws IOException { try { Text mapKey = new Text(idFieldName); String record_id = fields.get(mapKey).toString(); - currentRequest.add(Requests.indexRequest(indexName).id(record_id).type(objType).create(false).source(builder)); + currentRequest.add(Requests.indexRequest(indexName).id(record_id).type(objType).create(false).source(builder)); } catch (Exception e) { LOG.warn("Encountered malformed record"); } @@ -196,14 +199,14 @@ private void buildContent(XContentBuilder builder, Writable value) throws IOExce } else if (value instanceof FloatWritable) { builder.value(((FloatWritable)value).get()); } else if (value instanceof BooleanWritable) { - builder.value(((BooleanWritable)value).get()); + builder.value(((BooleanWritable)value).get()); } else if (value instanceof MapWritable) { builder.startObject(); for (Map.Entry entry : ((MapWritable)value).entrySet()) { if (!(entry.getValue() instanceof NullWritable)) { builder.field(entry.getKey().toString()); buildContent(builder, entry.getValue()); - } + } } builder.endObject(); } else if (value instanceof ArrayWritable) { @@ -213,7 +216,7 @@ private void buildContent(XContentBuilder builder, Writable value) throws IOExce buildContent(builder, arrayOfThings[i]); } builder.endArray(); - } + } } /** @@ -222,12 +225,21 @@ private void buildContent(XContentBuilder builder, Writable value) throws IOExce private void processBulkIfNeeded() { totalBulkItems.incrementAndGet(); if (currentRequest.numberOfActions() >= bulkSize) { - try { + boolean loggable = (System.currentTimeMillis() - lastLogTime >= 10000); + + try { long startTime = System.currentTimeMillis(); + if (loggable){ LOG.info("Sending [" + (currentRequest.numberOfActions()) + "]items"); } BulkResponse response = currentRequest.execute().actionGet(); totalBulkTime.addAndGet(System.currentTimeMillis() - startTime); - if (randgen.nextDouble() < 0.1) { - LOG.info("Indexed [" + totalBulkItems.get() + "] in [" + (totalBulkTime.get()/1000) + "s] of indexing"+"[" + ((System.currentTimeMillis() - runStartTime)/1000) + "s] of wall clock"+" for ["+ (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "rec/s]"); + if (loggable) { + LOG.info("Indexed [" + (currentRequest.numberOfActions()) + "]items " + + "in [" + ((System.currentTimeMillis() - startTime)/1000) + "]s; " + + "avg [" + (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "]rec/s" + + "(total [" + totalBulkItems.get() + "]items " + + "indexed in [" + (totalBulkTime.get()/1000) + "]s, " + + "wall clock [" + ((System.currentTimeMillis() - runStartTime)/1000) + "]s)"); + lastLogTime = System.currentTimeMillis(); } } catch (Exception e) { LOG.warn("Bulk request failed: " + e.getMessage()); diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingInputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingInputFormat.java new file mode 100644 index 0000000..2761be6 --- /dev/null +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingInputFormat.java @@ -0,0 +1,231 @@ +package com.infochimps.elasticsearch; + +import java.io.IOException; +import java.io.BufferedReader; +import java.io.FileReader; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; + +import org.elasticsearch.common.settings.loader.YamlSettingsLoader; +import org.elasticsearch.common.transport.InetSocketTransportAddress; + +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.action.search.SearchRequestBuilder; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.index.query.FilterBuilders.*; +import org.elasticsearch.cluster.ClusterName; + +public class ElasticSearchStreamingInputFormat implements InputFormat { + + static Log LOG = LogFactory.getLog(ElasticSearchStreamingInputFormat.class); + + // Job settings we need to control directly from Java options. + private static final String ES_INDEX_OPT = "elasticsearch.input.index"; + private static final String ES_DEFAULT_INDEX = "hadoop"; + private String indexName; + + private static final String ES_MAPPING_OPT = "elasticsearch.input.mapping"; + private static final String ES_DEFAULT_MAPPING = "streaming_record"; + private String mappingName; + + private static final String ES_NUM_SPLITS_OPT = "elasticsearch.input.splits"; + private static final String ES_NUM_SPLITS = "1"; + private Integer numSplits; + + private static final String ES_QUERY_OPT = "elasticsearch.input.query"; + private static final String ES_QUERY = "{\"match_all\": {}}"; + private String queryJSON; + + // Calculated after the first query. + private long numHits; + private Integer recordsPerSplit; + + // Elasticsearch internal settings required to make a client + // connection. + private static final String ES_CONFIG_OPT = "es.config"; + private static final String ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml"; + + private static final String ES_PLUGINS_OPT = "es.path.plugins"; + private static final String ES_PLUGINS = "/usr/local/share/elasticsearch/plugins"; + + private static final String ES_UNICAST_HOSTS_NAME = "discovery.zen.ping.unicast.hosts"; + + private TransportClient client; + + public RecordReader getRecordReader(InputSplit split, JobConf conf, Reporter reporter) { + setLocalElasticSearchInstallation(conf); + return (RecordReader) new ElasticSearchStreamingRecordReader(split, conf); + } + + public InputSplit[] getSplits(JobConf conf, int requestedNumSplits) { + this.numSplits = requestedNumSplits; + + setLocalElasticSearchInstallation(conf); + parseInput(conf); + + startTransportClient(conf); + findNumHits(); + stopTransportClient(); + + return createSplits(); + } + + + // + // == Setup == + // + + public void setLocalElasticSearchInstallation(JobConf conf) { + String esConfigPath = conf.get(ES_CONFIG_OPT, ES_CONFIG); + String esPluginsPath = conf.get(ES_PLUGINS_OPT, ES_PLUGINS); + System.setProperty(ES_CONFIG_OPT, esConfigPath); + System.setProperty(ES_PLUGINS_OPT,esPluginsPath); + LOG.info("Using Elasticsearch configuration file at "+esConfigPath+" and plugin directory "+esPluginsPath); + } + + private void parseInput(JobConf conf) { + this.indexName = conf.get(ES_INDEX_OPT, ES_DEFAULT_INDEX); + this.mappingName = conf.get(ES_MAPPING_OPT, ES_DEFAULT_MAPPING); + // this.numSplits = Integer.parseInt(conf.get(ES_NUM_SPLITS_OPT, ES_NUM_SPLITS)); + this.queryJSON = conf.get(ES_QUERY_OPT, ES_QUERY); + String message = "Using input /"+indexName; + if (mappingName != null && mappingName.length() > 0) { + message += "/"+mappingName; + } + if (queryJSON != null && queryJSON.length() > 0) { + message += " with query: "+queryJSON; + } + LOG.info(message); + } + + // + // == Connecting to Elasticsearch and Querying == + // + + private void startTransportClient(JobConf conf) { + this.client = new TransportClient(); + Map settings = parsedSettings(conf); + String host = hostname(settings); + if (host.toString().length() == 0) { + System.exit(1); + } + LOG.info("Attempting to connect to Elasticsearch node at " + host + ":9300"); + this.client = new TransportClient().addTransportAddress(new InetSocketTransportAddress(host, 9300)); + LOG.info("Connected to Elasticsearch cluster"); + } + + private Map parsedSettings(JobConf conf) { + String esConfigPath = conf.get(ES_CONFIG_OPT, ES_CONFIG); + String esPluginsPath = conf.get(ES_PLUGINS_OPT, ES_PLUGINS); + + try { + BufferedReader reader = new BufferedReader( new FileReader(esConfigPath)); + String line = null; + StringBuilder stringBuilder = new StringBuilder(); + String ls = System.getProperty("line.separator"); + while( ( line = reader.readLine() ) != null ) { + stringBuilder.append( line ); + stringBuilder.append( ls ); + } + return new YamlSettingsLoader().load(stringBuilder.toString()); + } catch (IOException e) { + LOG.error("Could not find or read the configuration file " + esConfigPath + "."); + return new HashMap(); + } + } + + private String hostname(Map settings) { + String hostsString = settings.get(ES_UNICAST_HOSTS_NAME); + if (hostsString.toString().length() == 0) { + LOG.error("Could not find hosts. Did you set the '" + ES_UNICAST_HOSTS_NAME + "' key?"); + return ""; + } + + String[] hosts = hostsString.split(","); + if (hosts.length > 0) { + String host = hosts[0]; + if (host.toString().length() == 0) { + LOG.error("Could not parse hosts from '" + ES_UNICAST_HOSTS_NAME + "' key."); + return ""; + } else { + return host; + } + } else { + LOG.error("Could not find any hosts in the '" + ES_UNICAST_HOSTS_NAME + "' key."); + return ""; + } + } + + private void stopTransportClient() { + if (client != null) client.close(); + LOG.info("Disconnected from Elasticsearch cluster"); + } + + private void findNumHits() { + SearchRequestBuilder request = client.prepareSearch(indexName); + if (mappingName != null && mappingName.length() > 0) { + request.setTypes(mappingName); + } + request.setSearchType(SearchType.COUNT); + if (queryJSON != null && queryJSON.length() > 0) { + request.setQuery(queryJSON); + } + SearchResponse response = request.execute().actionGet(); + this.numHits = response.hits().totalHits(); + + LOG.info("Ran query: "+String.valueOf(numHits)+" hits"); + } + + // + // == Setting splits == + // + + private void readjustSplitsByHits() { + + } + + private InputSplit[] createSplits() { + // Say that + // + // numHits = 7 + // numSplits = 2 + if((long) numSplits > numHits) { + numSplits = (int) numHits; + } + + this.recordsPerSplit = (int) (numHits/((long)numSplits)); // == 3 records/split + + List splits = new ArrayList(numSplits); + + // i == 0, 1 + for(int i = 0; i < numSplits; i++) { + Integer from = i * recordsPerSplit; + splits.add(new ElasticSearchStreamingSplit(indexName, mappingName, numSplits, queryJSON, numHits, from, recordsPerSplit)); + } + // 7 is > (2 * 3) == 6 + if (numHits > ((long) (numSplits * recordsPerSplit))) { + Integer from = numSplits * recordsPerSplit; + Integer size = (int) (numHits - ((long) from)); + splits.add(new ElasticSearchStreamingSplit(indexName, mappingName, numSplits, queryJSON, numHits, from, size)); + } + + LOG.info("Splitting "+String.valueOf(numHits)+" hits across "+String.valueOf(splits.size())+" splits ("+String.valueOf(recordsPerSplit)+" hits/split)"); + + return splits.toArray(new InputSplit[splits.size()]); + } + +} diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputCommitter.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputCommitter.java new file mode 100644 index 0000000..ed7c29e --- /dev/null +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputCommitter.java @@ -0,0 +1,37 @@ +package com.infochimps.elasticsearch; + +import java.io.IOException; + +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.TaskAttemptContext; + +public class ElasticSearchStreamingOutputCommitter extends OutputCommitter { + + @Override + public void setupJob(JobContext context) throws IOException { + + } + + @Override + public void cleanupJob(JobContext context) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + } + +} diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputFormat.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputFormat.java new file mode 100644 index 0000000..3120806 --- /dev/null +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingOutputFormat.java @@ -0,0 +1,88 @@ +package com.infochimps.elasticsearch; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.*; +import com.infochimps.elasticsearch.hadoop.util.HadoopUtils; + +/** + + Hadoop OutputFormat for writing arbitrary MapWritables (essentially + HashMaps) into Elasticsearch. Records are batched up and sent in a + one-hop manner to the elastic search data nodes that will index + them. + +*/ +public class ElasticSearchStreamingOutputFormat implements OutputFormat { + + static Log LOG = LogFactory.getLog(ElasticSearchStreamingOutputFormat.class); + + // Job settings we need to control directly from Java options. + private static final String ES_INDEX_OPT = "elasticsearch.output.index"; + private static final String ES_DEFAULT_INDEX = "hadoop"; + private String defaultIndexName; + + private static final String ES_MAPPING_OPT = "elasticsearch.output.mapping"; + private static final String ES_DEFAULT_MAPPING = "streaming_record"; + private String defaultMappingName; + + private static final String ES_INDEX_FIELD_OPT = "elasticsearch.output.index.field"; + private static final String ES_INDEX_FIELD = "_index"; + private String indexFieldName; + + private static final String ES_MAPPING_FIELD_OPT = "elasticsearch.output.mapping.field"; + private static final String ES_MAPPING_FIELD = "_mapping"; + private String mappingFieldName; + + private static final String ES_ID_FIELD_OPT = "elasticsearch.output.id.field"; + private static final String ES_ID_FIELD = "_id"; + private String idFieldName; + + private static final String ES_BULK_SIZE_OPT = "elasticsearch.output.bulk_size"; + private static final String ES_BULK_SIZE = "1000"; + private int bulkSize; + + + // Elasticsearch internal settings required to make a client + // connection. + private static final String ES_CONFIG_OPT = "es.config"; + private static final String ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml"; + + private static final String ES_PLUGINS_OPT = "es.path.plugins"; + private static final String ES_PLUGINS = "/usr/local/share/elasticsearch/plugins"; + + public RecordWriter getRecordWriter(FileSystem ignored, JobConf conf, String name, Progressable progress) throws IOException { + setLocalElasticSearchInstallation(conf); + String defaultIndexName = conf.get(ES_INDEX_OPT, ES_DEFAULT_INDEX); + String defaultMappingName = conf.get(ES_MAPPING_OPT, ES_DEFAULT_MAPPING); + String indexFieldName = conf.get(ES_INDEX_FIELD_OPT, ES_INDEX_FIELD); + String mappingFieldName = conf.get(ES_MAPPING_FIELD_OPT, ES_MAPPING_FIELD); + String idFieldName = conf.get(ES_ID_FIELD_OPT, ES_ID_FIELD); + Integer bulkSize = Integer.parseInt(conf.get(ES_BULK_SIZE_OPT, ES_BULK_SIZE)); + return (RecordWriter) new ElasticSearchStreamingRecordWriter(defaultIndexName, defaultMappingName, indexFieldName, mappingFieldName, idFieldName, bulkSize); + } + + public void setLocalElasticSearchInstallation(JobConf conf) { + String esConfigPath = conf.get(ES_CONFIG_OPT, ES_CONFIG); + String esPluginsPath = conf.get(ES_PLUGINS_OPT, ES_PLUGINS); + System.setProperty(ES_CONFIG_OPT,esConfigPath); + System.setProperty(ES_PLUGINS_OPT,esPluginsPath); + LOG.info("Using Elasticsearch configuration file at "+esConfigPath+" and plugin directory "+esPluginsPath); + } + + public ElasticSearchStreamingOutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return new ElasticSearchStreamingOutputCommitter(); + } + + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + } +} diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordReader.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordReader.java new file mode 100644 index 0000000..6fc4a62 --- /dev/null +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordReader.java @@ -0,0 +1,176 @@ +package com.infochimps.elasticsearch; + +import java.io.IOException; + +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.io.*; + +import org.elasticsearch.common.unit.TimeValue; + +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.ClusterName; + +class ElasticSearchStreamingRecordReader implements RecordReader { + + static Log LOG = LogFactory.getLog(ElasticSearchStreamingRecordReader.class); + + private static final String ES_REQUEST_SIZE_OPT = "elasticsearch.input.request_size"; + private static final String ES_REQUEST_SIZE = "100"; + private Integer requestSize; + + private static final String ES_SCROLL_TIMEOUT_OPT = "elasticsearch.input.scroll_timeout"; + private static final String ES_SCROLL_TIMEOUT = "5m"; + private String scrollTimeout; + private static final TimeValue defaultScrollTimeout = new TimeValue((long) 300000); // 5 minutes + private Scroll scroll; + + private Node node; + private Client client; + private ElasticSearchStreamingSplit split; + + private String scrollId; + private Integer recordsRead; + private Iterator hitsItr = null; + + public ElasticSearchStreamingRecordReader(InputSplit split, JobConf conf) { + this.split = (ElasticSearchStreamingSplit) split; + this.recordsRead = 0; + this.requestSize = Integer.parseInt(conf.get(ES_REQUEST_SIZE_OPT, ES_REQUEST_SIZE)); + this.scrollTimeout = conf.get(ES_SCROLL_TIMEOUT_OPT, ES_SCROLL_TIMEOUT); + this.scroll = new Scroll(TimeValue.parseTimeValue(this.scrollTimeout, defaultScrollTimeout)); + + LOG.info("Initializing "+this.split.getSummary()); + startEmbeddedClient(); + fetchNextHits(); + } + + private void fetchNextHits() { + if (scrollId == null) { + LOG.info("Running initial scroll with timeout "+scrollTimeout); + SearchRequestBuilder request = split.initialScrollRequest(client, scroll, requestSize); + SearchResponse response = request.execute().actionGet(); + this.scrollId = response.scrollId(); + LOG.info("Got scroll ID "+scrollId); + // Do we need to call fetchNextHits() again here? Or does + // the initial request also itself contain the first set + // of hits for the scroll? + // + // fetchNextHits(); + } else { + // LOG.info("Running query for scroll ID "+scrollId+" with timeout "+scrollTimeout); + SearchScrollRequestBuilder request = split.scrollRequest(client, scroll, scrollId); + SearchResponse response = request.execute().actionGet(); + this.scrollId = response.scrollId(); + // LOG.info("Got scroll ID "+scrollId); + this.hitsItr = response.hits().iterator(); + } + } + + @Override + public boolean next(K key, V value) throws IOException { + if (shouldReadAnotherRecord()) { + // We should read more records because we haven't read as + // many as we know to be in this split yet. + if (hasAnotherRecord()) { + // We already have records stacked up ready to read. + readRecord(key, value); + return true; + } else { + // We don't have records stacked up so we might need + // to fetch some more hits. + fetchNextHits(); + if (hasAnotherRecord()) { + // Now if we have records we read one + readRecord(key, value); + return true; + } else { + // But if no records are here this time, it's + // because we know we're done reading the input. + return false; + } + } + } else { + // Return false as we're done with this split. + return false; + } + } + + private boolean shouldReadAnotherRecord() { + return recordsRead < split.getSize(); + } + + private boolean hasAnotherRecord() { + return hitsItr != null && hitsItr.hasNext(); + } + + private void readRecord(K key, V value) { + SearchHit hit = hitsItr.next(); + if (hit != null) { + Text keyText = (Text) key; + Text valueText = (Text) value; + keyText.set(hit.sourceAsString()); + valueText.set(hit.sourceAsString()); + recordsRead += 1; + } + } + + @Override + public K createKey() { + return (K) new Text(); + } + + @Override + public V createValue() { + return (V) new Text(); + } + + @Override + public long getPos() throws IOException { + return recordsRead; + } + + @Override + public float getProgress() throws IOException { + return ((float) recordsRead) / ((float) split.getSize()); + } + + @Override + public void close() throws IOException { + stopEmbeddedClient(); + } + + // + // == Connecting to Elasticsearch == + // + + private void startEmbeddedClient() { + LOG.info("Starting embedded Elasticsearch client (non-datanode)..."); + this.node = NodeBuilder.nodeBuilder().client(true).node(); + this.client = node.client(); + LOG.info("Successfully joined Elasticsearch cluster '"+ClusterName.clusterNameFromSettings(node.settings())+'"'); + } + + private void stopEmbeddedClient() { + LOG.info("Stopping embedded Elasticsearch client..."); + if (client != null) client.close(); + if (node != null) node.close(); + LOG.info("Left Elasticsearch cluster"); + } + + +} diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordWriter.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordWriter.java new file mode 100644 index 0000000..9214867 --- /dev/null +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingRecordWriter.java @@ -0,0 +1,171 @@ +package com.infochimps.elasticsearch; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.*; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.ExceptionsHelper; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.JsonParseException; + +class ElasticSearchStreamingRecordWriter implements RecordWriter { + + static Log LOG = LogFactory.getLog(ElasticSearchStreamingRecordWriter.class); + + private String defaultIndexName; + private String defaultMappingName; + private String indexFieldName; + private String mappingFieldName; + private String idFieldName; + private Integer bulkSize; + + // Bookkeeping + private AtomicLong totalBulkTime = new AtomicLong(); + private AtomicLong totalBulkItems = new AtomicLong(); + private Random randgen = new Random(); + private long runStartTime = System.currentTimeMillis(); + + // Elasticsearch indexing + private Node node; + private Client client; + private volatile BulkRequestBuilder currentRequest; + + // JSON parsing + private ObjectMapper mapper; + + // + // == Lifecycle == + // + + public ElasticSearchStreamingRecordWriter(String defaultIndexName, String defaultMappingName, String indexFieldName, String mappingFieldName, String idFieldName, Integer bulkSize) { + this.defaultIndexName = defaultIndexName; + this.defaultMappingName = defaultMappingName; + this.indexFieldName = indexFieldName; + this.mappingFieldName = mappingFieldName; + this.idFieldName = idFieldName; + this.bulkSize = bulkSize; + + LOG.info("Writing "+Integer.toString(bulkSize)+" records per batch"); + LOG.info("Using default target /"+defaultIndexName+"/"+defaultMappingName); + LOG.info("Records override default target with index field '"+indexFieldName+"', mapping field '"+mappingFieldName+"', and ID field '"+idFieldName); + + startEmbeddedClient(); + this.currentRequest = client.prepareBulk(); + this.mapper = new ObjectMapper(); + } + + /** + Start an embedded Elasticsearch client. The client will not be + a data node and will not store data locally. + + The client will connect to the target Elasticsearch cluster as + a client node, enabling one-hop writes for all data. See + http://www.elasticsearch.org/guide/reference/java-api/client.html + */ + private void startEmbeddedClient() { + LOG.info("Starting embedded Elasticsearch client (non-datanode)..."); + this.node = NodeBuilder.nodeBuilder().client(true).node(); + this.client = node.client(); + LOG.info("Successfully joined Elasticsearch cluster '"+ClusterName.clusterNameFromSettings(node.settings())+'"'); + } + + + /** + Close the Elasticsearch client, sending out one last bulk write + if necessary. + */ + public void close(Reporter reporter) throws IOException { + sendBulkRequestIfMoreThan(0); + LOG.info("Shutting down Elasticsearch client..."); + if (client != null) client.close(); + if (node != null) node.close(); + LOG.info("Successfully shut down Elasticsearch client"); + } + + // + // == Writing records == + // + + public void write(K key, V value) throws IOException { + String json = ((Text) key).toString(); + try { + index(json); + sendBulkRequestIfBigEnough(); + } catch(Exception e) { + if (ExceptionsHelper.unwrapCause(e) instanceof JsonParseException) { + LOG.debug("Bad record: "+json); + return; + } else { + LOG.error("Could not write record: "+json, e); + } + } + } + + private void index(String json) throws IOException { + Map record = mapper.readValue(json, Map.class); + if (record.containsKey(idFieldName)) { + Object idValue = record.get(idFieldName); + currentRequest.add(Requests.indexRequest(indexNameForRecord(record)).id(String.valueOf(idValue)).type(mappingNameForRecord(record)).create(false).source(json)); + } else { + currentRequest.add(Requests.indexRequest(indexNameForRecord(record)).type(mappingNameForRecord(record)).source(json)); + } + } + + private String indexNameForRecord(Map record) { + if (record.containsKey(indexFieldName)) { + Object indexValue = record.get(indexFieldName); + return String.valueOf(indexValue); + } else { + return defaultIndexName; + } + } + + private String mappingNameForRecord(Map record) { + if (record.containsKey(mappingFieldName)) { + Object mappingValue = record.get(mappingFieldName); + return String.valueOf(mappingValue); + } else { + return defaultMappingName; + } + } + + // + // == Bulk request handling == + // + + private void sendBulkRequestIfBigEnough() { + sendBulkRequestIfMoreThan(bulkSize); + } + + private void sendBulkRequestIfMoreThan(int size) { + totalBulkItems.incrementAndGet(); + if (currentRequest.numberOfActions() > size) { + long startTime = System.currentTimeMillis(); + BulkResponse response = currentRequest.execute().actionGet(); + totalBulkTime.addAndGet(System.currentTimeMillis() - startTime); + if (randgen.nextDouble() < 0.1) { + LOG.info("Indexed [" + totalBulkItems.get() + "] in [" + (totalBulkTime.get()/1000) + "s] of indexing"+"[" + ((System.currentTimeMillis() - runStartTime)/1000) + "s] of wall clock"+" for ["+ (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "rec/s]"); + } + currentRequest = client.prepareBulk(); + } + } +} diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingSplit.java b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingSplit.java new file mode 100644 index 0000000..0871385 --- /dev/null +++ b/src/main/java/com/infochimps/elasticsearch/ElasticSearchStreamingSplit.java @@ -0,0 +1,102 @@ +package com.infochimps.elasticsearch; + +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; + +import org.elasticsearch.search.Scroll; + +import org.elasticsearch.client.Client; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; +import org.elasticsearch.action.search.SearchType; + +public class ElasticSearchStreamingSplit implements InputSplit, Writable { + + private String indexName; + private String mappingName; + private Integer numSplits; + private String queryJSON; + private Long numHits; + private Integer from; + private Integer size; + + public ElasticSearchStreamingSplit() { + } + + public ElasticSearchStreamingSplit(String indexName , String mappingName, Integer numSplits, String queryJSON, Long numHits, Integer from, Integer size) { + this.indexName = indexName; + this.mappingName = mappingName; + this.numSplits = numSplits; + this.queryJSON = queryJSON; + this.numHits = numHits; + this.from = from; + this.size = size; + } + + public String getSummary() { + Integer thisSplitNum = (int) (((long) from) / (numHits / ((long) numSplits))); + return "ElasticSearch input split "+String.valueOf(thisSplitNum + 1)+"/"+String.valueOf(numSplits)+" with "+String.valueOf(size)+" records from /"+indexName+"/"+mappingName; + } + + public Integer getSize() { + return size; + } + + public boolean hasQuery() { + return queryJSON != null && queryJSON.length() > 0; + } + + public SearchRequestBuilder initialScrollRequest(Client client, Scroll scroll, Integer requestSize) { + SearchRequestBuilder request = client.prepareSearch(indexName).setSearchType(SearchType.SCAN).setScroll(scroll); + if (mappingName != null && mappingName.length() > 0) { + request.setTypes(mappingName); + } + request.setFrom((int) from); + request.setSize(requestSize); + if (hasQuery()) { + request.setQuery(queryJSON); + } + return request; + } + + public SearchScrollRequestBuilder scrollRequest(Client client, Scroll scroll, String scrollId) { + return client.prepareSearchScroll(scrollId).setScroll(scroll); + } + + @Override + public String[] getLocations() { + return new String[] {}; + } + + @Override + public long getLength() { + return 0; + } + + @Override + public void readFields(DataInput in) throws IOException { + this.indexName = Text.readString(in); + this.mappingName = Text.readString(in); + this.numSplits = in.readInt(); + this.queryJSON = Text.readString(in); + this.numHits = in.readLong(); + this.from = in.readInt(); + this.size = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, indexName); + Text.writeString(out, mappingName); + out.writeInt(numSplits); + Text.writeString(out, queryJSON); + out.writeLong(numHits); + out.writeInt(from); + out.writeInt(size); + } +} diff --git a/src/main/java/com/infochimps/elasticsearch/ElasticTest.java b/src/main/java/com/infochimps/elasticsearch/ElasticTest.java index 8fc9c50..b9febc2 100644 --- a/src/main/java/com/infochimps/elasticsearch/ElasticTest.java +++ b/src/main/java/com/infochimps/elasticsearch/ElasticTest.java @@ -33,7 +33,7 @@ import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.elasticsearch.client.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.ExceptionsHelper; diff --git a/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java b/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java index a2fac59..b0547d4 100644 --- a/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java +++ b/src/main/java/com/infochimps/elasticsearch/pig/ElasticSearchStorage.java @@ -53,6 +53,8 @@ public class ElasticSearchStorage extends LoadFunc implements StoreFuncInterface private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name"; private static final String ES_OBJECT_TYPE = "elasticsearch.object.type"; private static final String ES_IS_JSON = "elasticsearch.is_json"; + private static final String ES_IS_AVRO = "elasticsearch.is_avro"; + private static final String PIG_ES_FIELD_NAMES = "elasticsearch.pig.field.names"; private static final String ES_REQUEST_SIZE = "elasticsearch.request.size"; private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits"; @@ -166,24 +168,24 @@ public void putNext(Tuple t) throws IOException { MapWritable record = new MapWritable(); String isJson = property.getProperty(ES_IS_JSON); - // Handle delimited records (ie. isJson == false) - if (isJson != null && isJson.equals("false")) { - String[] fieldNames = property.getProperty(PIG_ES_FIELD_NAMES).split(COMMA); - for (int i = 0; i < t.size(); i++) { - if (i < fieldNames.length) { - try { - record.put(new Text(fieldNames[i]), new Text(t.get(i).toString())); - } catch (NullPointerException e) { - //LOG.info("Increment null field counter."); - } - } - } - } else { + String isAvro = property.getProperty(ES_IS_AVRO); + + // Handle avro records (ie. isAvro == true) + if (isAvro != null && isAvro.equals("true")) { + String[] fieldNames = property.getProperty(PIG_ES_FIELD_NAMES) + .split(COMMA); + record = tupleToWritable(t, fieldNames); + + } else + // Format JSON + // Handle json records (ie. isJson == true) + if (isJson != null && isJson.equals("true")) { if (!t.isNull(0)) { String jsonData = t.get(0).toString(); // parse json data and put into mapwritable record try { - HashMap data = mapper.readValue(jsonData, HashMap.class); + HashMap data = mapper.readValue(jsonData, + HashMap.class); record = (MapWritable)toWritable(data); } catch (JsonParseException e) { e.printStackTrace(); @@ -191,6 +193,21 @@ record = (MapWritable)toWritable(data); e.printStackTrace(); } } + + } else { + // Handle delimited records (ie. isJson == false, isAvro == false) + String[] fieldNames = property.getProperty(PIG_ES_FIELD_NAMES) + .split(COMMA); + for (int i = 0; i < t.size(); i++) { + if (i < fieldNames.length) { + try { + record.put(new Text(fieldNames[i]), new Text(t.get(i) + .toString())); + } catch (NullPointerException e) { + // LOG.info("Increment null field counter."); + } + } + } } try { @@ -273,7 +290,18 @@ private void elasticSearchSetup(String location, Job job) { if (isJson==null || isJson.equals("false")) { // We're dealing with delimited records UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(ResourceSchema.class); + Properties property = context + .getUDFProperties(ResourceSchema.class); + property.setProperty(ES_IS_JSON, "false"); + } + + String isAvro = query.get("avro"); + if (isAvro != null || isJson.equals("true")) { + // We're dealing with avro records + UDFContext context = UDFContext.getUDFContext(); + Properties property = context + .getUDFProperties(ResourceSchema.class); + property.setProperty(ES_IS_AVRO, "true"); property.setProperty(ES_IS_JSON, "false"); } @@ -314,8 +342,23 @@ private HashMap parseURIQuery(String query) { } /** - Recursively converts an arbitrary object into the appropriate writable. Please enlighten me if there is an existing - method for doing this. + * Recursively converts an arbitrary object into the appropriate writable. + * Please enlighten me if there is an existing method for doing this. + */ + private MapWritable tupleToWritable(Tuple tuple, String[] fieldNames) { + List attributes = tuple.getAll(); + MapWritable listOfThings = new MapWritable(); + for (int i = 0; i < fieldNames.length; i++) { + listOfThings.put(toWritable(fieldNames[i]), + toWritable(((List) attributes).get(i))); + } + return listOfThings; + + } + + /** + * Recursively converts an arbitrary object into the appropriate writable. + * Please enlighten me if there is an existing method for doing this. */ private Writable toWritable(Object thing) { if (thing instanceof String) { @@ -352,4 +395,8 @@ private Writable toWritable(Object thing) { @Override public void cleanupOnFailure(String location, Job job) throws IOException { } + + @Override + public void cleanupOnSuccess(String location, Job job) throws IOException { + } } diff --git a/test/test_dump.pig b/test/test_dump.pig index fd1acaa..38f55e6 100644 --- a/test/test_dump.pig +++ b/test/test_dump.pig @@ -1,22 +1,19 @@ -- -- This tests loading data from elasticsearch -- -register '/usr/local/share/elasticsearch/lib/elasticsearch-0.16.0.jar'; -register '/usr/local/share/elasticsearch/lib/jline-0.9.94.jar'; -register '/usr/local/share/elasticsearch/lib/jna-3.2.7.jar'; -register '/usr/local/share/elasticsearch/lib/log4j-1.2.15.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-analyzers-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-core-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-highlighter-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-memory-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-queries-3.1.0.jar'; -register target/wonderdog-1.0-SNAPSHOT.jar; - -%default INDEX 'foo_test' -%default OBJ 'foo' + +%default ES_JAR_DIR '/usr/local/Cellar/elasticsearch/0.18.7/libexec' +%default ES_YAML '/usr/local/Cellar/elasticsearch/0.18.7/config/elasticsearch.yml' +%default PLUGINS '/usr/local/Cellar/elasticsearch/0.18.7/plugins' + +%default INDEX 'foo_test' +%default OBJ 'foo' + +register $ES_JAR_DIR/*.jar; +register target/wonderdog*.jar; -- -- Will load the data as (doc_id, contents) tuples where the contents is the original json source from elasticsearch -- -foo = LOAD 'es://foo_test/foo?q=character:c' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage() AS (doc_id:chararray, contents:chararray); +foo = LOAD 'es://$INDEX/$OBJ' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage('$ES_YAML', '$PLUGINS') AS (doc_id:chararray, contents:chararray); DUMP foo; diff --git a/test/test_json_loader.pig b/test/test_json_loader.pig index 70f076c..0c9d628 100644 --- a/test/test_json_loader.pig +++ b/test/test_json_loader.pig @@ -1,19 +1,16 @@ -- -- This tests the json indexer. Run in local mode with 'pig -x local test/test_json_loader.pig' -- -register '/usr/local/share/elasticsearch/lib/elasticsearch-0.16.0.jar'; -register '/usr/local/share/elasticsearch/lib/jline-0.9.94.jar'; -register '/usr/local/share/elasticsearch/lib/jna-3.2.7.jar'; -register '/usr/local/share/elasticsearch/lib/log4j-1.2.15.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-analyzers-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-core-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-highlighter-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-memory-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-queries-3.1.0.jar'; -register target/wonderdog-1.0-SNAPSHOT.jar; - -%default INDEX 'foo_test' -%default OBJ 'foo' + +%default ES_JAR_DIR '/usr/local/Cellar/elasticsearch/0.18.7/libexec' +%default ES_YAML '/usr/local/Cellar/elasticsearch/0.18.7/config/elasticsearch.yml' +%default PLUGINS '/usr/local/Cellar/elasticsearch/0.18.7/plugins' + +%default INDEX 'foo_test' +%default OBJ 'foo' + +register $ES_JAR_DIR/*.jar; +register target/wonderdog*.jar; foo = LOAD 'test/foo.json' AS (data:chararray); @@ -21,4 +18,4 @@ foo = LOAD 'test/foo.json' AS (data:chararray); -- Query parameters let elasticsearch output format that we're storing json data and -- want to use a bulk request size of 1 record. -- -STORE foo INTO 'es://$INDEX/$OBJ?json=true&size=1' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); +STORE foo INTO 'es://$INDEX/$OBJ?json=true&size=1' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage('$ES_YAML', '$PLUGINS'); diff --git a/test/test_tsv_loader.pig b/test/test_tsv_loader.pig index 7685298..a91f3e1 100644 --- a/test/test_tsv_loader.pig +++ b/test/test_tsv_loader.pig @@ -1,20 +1,16 @@ -- -- This tests the tsv indexer. Run in local mode with 'pig -x local test/test_tsv_loader.pig' -- -register '/usr/local/share/elasticsearch/lib/elasticsearch-0.16.0.jar'; -register '/usr/local/share/elasticsearch/lib/jline-0.9.94.jar'; -register '/usr/local/share/elasticsearch/lib/jna-3.2.7.jar'; -register '/usr/local/share/elasticsearch/lib/log4j-1.2.15.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-analyzers-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-core-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-highlighter-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-memory-3.1.0.jar'; -register '/usr/local/share/elasticsearch/lib/lucene-queries-3.1.0.jar'; -register target/wonderdog-1.0-SNAPSHOT.jar; - -%default INDEX 'foo_test' -%default OBJ 'foo' +%default ES_JAR_DIR '/usr/local/Cellar/elasticsearch/0.18.7/libexec' +%default ES_YAML '/usr/local/Cellar/elasticsearch/0.18.7/config/elasticsearch.yml' +%default PLUGINS '/usr/local/Cellar/elasticsearch/0.18.7/plugins' + +%default INDEX 'foo_test' +%default OBJ 'foo' + +register $ES_JAR_DIR/*.jar; +register target/wonderdog*.jar; foo = LOAD 'test/foo.tsv' AS (character:chararray, value:int); -STORE foo INTO 'es://$INDEX/$OBJ?json=false&size=1' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage(); +STORE foo INTO 'es://$INDEX/$OBJ?json=false&size=1' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage('$ES_YAML', '$PLUGINS'); diff --git a/wonderdog.gemspec b/wonderdog.gemspec new file mode 100644 index 0000000..6b0c2a9 --- /dev/null +++ b/wonderdog.gemspec @@ -0,0 +1,32 @@ +# -*- encoding: utf-8 -*- +require File.expand_path('../lib/wonderdog/version', __FILE__) + +Gem::Specification.new do |gem| + gem.name = 'wonderdog' + gem.homepage = 'https://github.com/infochimps-labs/wonderdog' + gem.licenses = ["Apache 2.0"] + gem.email = 'coders@infochimps.com' + gem.authors = ['Infochimps', 'Philip (flip) Kromer', 'Jacob Perkins', 'Travis Dempsey', 'Dhruv Bansal'] + gem.version = Wonderdog::VERSION + + gem.summary = 'Make Hadoop and ElasticSearch play together nicely.' + gem.description = <<-EOF + Wonderdog provides code in both Ruby and Java to make Elasticsearch + a more fully-fledged member of both the Hadoop and Wukong + ecosystems. + + For the Java side, Wonderdog provides InputFormat and OutputFormat + classes for use with Hadoop (esp. Hadoop Streaming) and Pig. + + For the Ruby side, Wonderdog provides extensions for wu-hadoop to + make running Hadoop Streaming jobs written in Wukong against + ElasticSearch easier. +EOF + + gem.files = `git ls-files`.split("\n") + gem.executables = [] + gem.test_files = gem.files.grep(/^spec/) + gem.require_paths = ['lib'] + + gem.add_dependency('wukong-hadoop', '0.1.1') +end