Hadoopの各種方式でWordCountを作ってみた。
性能(実行速度)の測定に関しては、VMware Playerを使った仮想環境で実行しているので、あくまで参考程度。
CPUのコア数や物理メモリーは足りていると思うが、一番肝心なディスクは1台を共有しているし。
- 物理マシン(ホスト):Core i7 3.4GHz(8スレッド)、メモリー16GB、Windows7(64bit)
- 仮想マシン(ゲスト):CentOS5.7(64bit)
- 開発機(単独環境):メモリー2GB
- NameNode:メモリー2GB
- DataNode(3台):メモリー1.5GB
方式 |
データパターンA 160MB(単語種類数4) |
データパターンB 160MB(1単語1個) | ||
---|---|---|---|---|
単独環境 | 仮想分散 | 単独環境 | 仮想分散 | |
Java | 0:21~0:22 | 0:23~0:26 | 1:00~1:01 | 1:02~1:09 |
C言語 | 0:26~0:29 | 0:24~0:28 | 1:26~1:33 | 1:09~1:15 |
C言語(Combinerなし) | 0:44~0:46 | 0:50~0:54 | 0:55~0:56 | 1:06~1:10 |
Pig | 1:44~1:50 | 0:59~1:05 | 5:37~5:38 | 3:19~3:29 |
Hive | 0:41~0:42 | 0:56~1:02 | 2:43~2:49 | 2:13~2:16 |
Asakusa Framework | 0:43~0:45 | 0:47~0:48 | 2:03~2:27 | 3:32~4:10 |
Cascading | 0:22~0:22 | 0:27~0:27 | 4:27~4:29 | 3:12~3:37 |
Awk | 0:37~0:42 | 0:33~0:35 | 2:36~2:36 | 1:59~2:01 |
Awk(R連想配列) | 0:39~0:44 | 0:30~0:34 | 途中で止めた | ジョブ失敗 |
Awk(MR連想配列) | 0:13~0:14 | 0:19~0:22 | 測定せず | 測定せず |
Java(HashMap) | 0:06~0:08 | 0:19~0:20 | 1:31~1:34 | ヒープ不足 |
Java(HashMap上限付き) | 0:07~0:08 | 0:18~0:20 | 1:07~1:07 | 1:05~1:17 |
Huahin Framework | 29:12~35:34 | 22:06~24:32 | 測定せず | 測定せず |
AZAREA-Cluster Framework | 0:25~0:33 | 0:33~0:35 | 1:56〜2:23 | 1:52〜2:12 |
パターンAは、単語の種類が少ない(出力結果が少数レコードに集約される)データ。
パターンBは、1つの単語が1回ずつしか出てこない、つまり出力結果が大量になるデータ。(WordCount的には最悪のケース)
単語数を集計するのに連想配列を使う方式だと問題が出るだろうと思われるパターン。
入力データの160MBというのは、64MB×2+32MB、つまりHDFSの3ブロック(Mapタスク3個)分というサイズ。
なお、今回の環境はレプリケーション数は2(通常のデフォルトは3)。
分散環境では、各プログラムはNameNode上で起動している。
Pig・Hive・AsakusaFWの環境構築はNameNodeのみに行っている。
CascadingはNameNodeの$CASCADING_HOMEにインストールし、DataNodeの$HADOOP_HOME/libにjarファイルを入れている。
shディレクトリーをNameNodeや単独環境にコピーすれば、その中の各シェルを実行するだけで各プログラムが起動する(はず)。
(Hiveはスクリプトファイル内のパスを変更する必要あり)
単独環境(単体テストを行う環境)と仮想分散環境では、同じ方式(言語)でも当然速度が異なるが、異なり方は方式によって違う。
さらにデータサイズを変えると異なり方も変わってくると思う。
また、方式間の順位(どれが速いか)については、単独環境と仮想分散環境でもほぼ同じ順位になっているようだ。
ちなみに、例えばJava版では単独環境と仮想分散環境であまり速度に差が無い(仮想分散環境の方が遅い)が、
これは仮想分散環境ではHadoop起動にオーバーヘッドがかかっている為と思われる。
試しに640MBのデータパターンAを実行したら、単独環境は1:13~17、仮想分散環境は0:58~1:03で仮想分散環境の方が速くなった。
一番基本的なHadoop(CDH3u2)のJava API(Map/Reduce)を使ったプログラム。
WordCountのようなシンプルなプログラムでは、やはりこれが一番速い。
Hadoop StreamingとC言語の組み合わせ。
C言語で書いてコンパイルしてネイティブな実行ファイルになっているので、Java版と同じくらいの速度が出ている。
(ただし、バッファサイズは固定だし、文字コードも特に考慮していないので、Javaの方が安全性・機能性は上)
データパターンAではCombinerの有無で速度が2倍くらい違っている。
データパターンBはCombinerで全く集計されないパターンなので、Combinerが無駄な処理となっていて、Combiner有りは無しより若干遅い。
StreamingはCombinerの有無をシェルの書き換えだけで切り替えられる(Javaだとリコンパイルが必要)ので、こういうのを試すのは便利。
Pigは、ソースのステップ数では今回の中で一番少ないし、一番早く作れた。
個人的には関数型言語に似ていてHiveより良いと思うのだが、残念ながら速度面ではHiveの方が上なようだ。
Hiveは実行環境を作るのが面倒だった。
というのは、スクリプト内にファイルのパスを直書きすることしか出来ず、しかもCREATE EXTERNAL TABLEが相対パスを認識しないようなので、単独環境で試したスクリプトを分散環境に持っていったら書き換える必要がある為。
ちなみにHiveと似たような位置付けであるPigはスクリプト外部からパラメーターを渡せるようになっているしファイルの相対パスも普通に解釈されるので、こういう問題は無い。
AsakusaFWのDirect I/O版で作り直した。 (最初は0.2.3 batchappで作っていた。テキストファイルに対応していなくてInputFormatを自作したりしたが、 Direct I/Oでは自動生成してくれるので、ファイル名とパスくらいのコーディングで済む)
データパターンAの実行速度はPigやHiveより速く、Java版より約20秒遅い程度。
AsakusaFWではepilogue.fileioという後処理を行っており、これが10秒くらいなので、処理本体の実行時間はJava版に近付いている。
一方、データパターンBではepilogue.fileioが1分30秒以上かかっているので、出力件数次第で余計な時間がかかってしまうという事のようだ。
日本ではあまり注目されていないCascadingだが、せっかくなので試してみた。
データパターンAでは、Java版・C言語版に近い速度が出ている。
最初にCascadingを勉強した頃はまだPigを知らなかったので気付かなかったが、CascadingとPigは意外と似ている気がする。
Pigの考え方・記述方法をJavaのクラスで作ったら、Cascadingのようになるんじゃないかと思う。
ただ、演算した結果のフィールド名がどうなるのかは、Pigだとdescribeで都度確認できるのに対し、Cascadingは簡単に確認する方法が無い…。
Hadoop StreamingとAwkの組み合わせ。
Awkはインタープリター言語(実行時に命令を解釈する)なので、C言語版よりは実行速度が遅くなっている。
Hadoop Streamingをウェブで検索して出てくるサンプルでは、単語の集計部分(Reducer)で連想配列(ディクショナリー・マップ)を使っているものが多いので、同様の構造も試してみた。
データパターンBではメモリー不足で落ちるかな?と思っていたが、Awkの場合は仮想メモリーを使うようで、Awk自体は落ちなかった。
しかしマシンの負荷は高まっていたようで、JobTrackerと通信できなくなり(各タスクがタイムアウトになった)、ジョブ失敗で終了した。
失敗になるまで30分以上かかった。
単独環境でも30分放っておいても終わらなかったので、強制終了した。
ちなみに、「メモリーに収まる範囲なら連想配列を使う方式でも良い」とすると、ReducerだけでなくMapperでも連想配列を使って自前で集計すればいいじゃん。
という事で試してみたら、Java版よりも速くなった!
Awkでの連想配列使用版と同様に、JavaのMapperでjava.util.HashMapを使って自前で計算するバージョンを作ってみた。
(Reducerはそもそもキー毎にメソッドが呼ばれるので、自前でHashMapを使って件数を管理する必要は無い)
データパターンAでは、単独環境ではぶっちぎり、分散環境でも最速となった。
データパターンBではOutOfMemoryErrorが発生するかと思っていたのだが、単独環境では発生しなかった。
考えてみればデータ量は160MBなので、充分メモリーに収まるサイズだ…。
だったら分散環境でもブロックサイズは64MBなので、ブロック単位でMapperは呼ばれるから落ちないだろう…と思ったら、落ちた(苦笑)
例外のメッセージは「Exception in thread "communication thread" java.lang.OutOfMemoryError: Java heap space
」。
コミュニケーションスレッドというのが何だかは知らないが、データ処理以外の部分でメモリーが足りなくなったらしい。
一応、HashMapのキー数が一定サイズを超えたらHashMapをクリアするようなロジックを入れたバージョンも作ってみたところ、OutOfMemoryErrorは発生しなくなったし、速度もほとんど変わらなかった。
むしろ単独環境のデータパターンBでは速くなった。無制限だと1つのHashMapをずっと拡張していくことになるので、サイズ拡張時にコピーが発生するしGCも起こりやすくなるだろうから、やはり効率は良くないようだ。
Huahin Frameworkが出たので、試してみた。
アプリケーションのコーディングはしやすいのだが、速度が全然出ない…。
Combinerが使われていないのは想定内だが、C言語のCombiner無し版と比べても遅すぎる。
Mapperの出力バイト数やShuffleのバイト数(約3GB)がC言語のCombiner無し版(約250MB)の10倍以上あるので、そのせいか?
自分のアプリケーションの作り方が悪くてこうなってしまったのだろうか。
AZAREA-Cluster Frameworkが公表されたので、評価版を試してみた。
AZAREAはフローを描くとJavaソースを生成してくれるので、WordCountくらいシンプルだと確かに便利。ほとんどコーディングする場所は無かった。
実行速度もかなり速い部類に入る。