Hadoopに入門してみた - セットアップからHadoop Streaming まで -
大規模データを処理する必要が出て来たので、Hadoopを導入してみることになりました。
以下、導入メモです。
セットアップ
以下のような構成で試してみます。環境はCentOSです。
マスター(host001) ━┳ スレーブ(host002) ┣ スレーブ(host003) ┣ スレーブ(host004) ┗ スレーブ(host005)
まずは各マシンにJavaをインストール。JDK1.6を落として来てrpmでインストールするか、yum install java-1.6.0*などとたたけばOKです。(rpmでインストールする場合は http://java.sun.com/javase/ja/6/download.html から jdk-6u18-linux-i586-rpm.binをダウンロードして、実行権限を与えてルートで実行すればインストールできます。)
続いてマスターノードにHadoop本体をダウンロードします。 ここら辺から適当に落としてきます。
tarballを落として来たら適当な場所で展開し、以下の設定ファイルをいじります。
conf/hadoop-env.sh
JAVA_HOMEは設定必須です。javaをyumでインストールした場合は export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0/ とか。rpmで入れた場合は export /usr/java/default にしておけばよいでしょう。
core-site.xml
マスターノードのホスト名を書き入れます。
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>fs.default.name</name> <value>hdfs://host001:9000</value> <final>true</final> </property> </configuration>
hdfs-site.xml
今回は試しなのでレプリケーションを1としました。
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
mapred-site.xml
job trackerにはマスターノードのホストを指定します。mapperとreducerは各々3個に指定してみました。
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>mapred.job.tracker</name> <value>host001:9001</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>3</value> </property> <property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>3</value> </property> </configuration>
以上の設定ファイルを書いたら、hadoopディレクトリごと全サーバにrsyncします。
Hadoop本体の設置は以上で終わり。意外と簡単ですね!
解析対象となるファイルを設置する
Hadoopで解析させたいファイルをhdfs上に設置します。
hdfsへのファイルのコピーはマスターノード上で bin/hadoop fs コマンドを使って行ないます。ここでは/data/logs/以下にapacheのアクセスログが1ヶ月分置いてあると仮定しましょう。
その場合のコマンドはこんな感じになります。
[miki@host001 hadoop]$ bin/hadoop fs -put /data/logs/* input
上記のコマンドを叩くことで分散ファイルシステム上へのコピーが始まります。
デフォルト設定だと各スレーブの /tmp/hadoop-ユーザ名/dfs の下に64Mごとのチャンクに分割されたファイルが格納されていきます。
ためしにコンソールを複数立ち上げて、host002〜host005 で各々 watch -n 0 "du -s /tmp/hadoop-ユーザ名/dfs" などとすると、1台づつ順繰りにデータが渡されて来る様子を確認できます。
ここら辺、もっと一気に並列でどばっと撒いてくれると短時間で済むと思うんですが、どうやらそうなってはいないようです。Hadoopの全行程において、この「分散ファイルシステムへのコピー」は相当時間のかかる行為なので、イライラします。
なお後からわかったんですが、実は今回試した環境が100Mbpsしかスループットが出ないswitchに収容されていたため、600Gのデータを撒くのに、12時間もかかってしまいました。まずはギガビット出るようにしておかないとお話にならないようですね。
さてさて、なにはともあれhdfsへの転送が終了したので、確認してみます。
/user/ユーザ名/input/ というhdfs上のディレクトリにアクセスログが格納された様子がわかると思います。あとはこのhdfs上のデータに対してmap-reduceの処理を実行させていくことになります。
Hadoop Streaming を使ってmapperとreducerを書く
さて、ここまでくれば後は map-reduceの処理を走らせるだけなんですが、実は hadoop-streaming というAPIを使えば、javaでなくとも好きな言語でmapperとreducerを書くことができます。その際のルールは「標準入力からデータをもらって標準出力に書き出す」というきわめて単純なものとなっています。ありがたい!
というわけで、perlでmapperとreducer書いてみました。
まずはmap.pl。
#!/usr/local/bin/perl use strict; use warnings; my $counter; while(<STDIN>){ chomp $_; my @f = split(" ", $_); $counter->{$f[5]}++; } while(my ($key, $value) = each %$counter){ print $_,"\t", $counter->{$_},"\n"; }
標準入力からログを1行づつ読み込んで、適当にsplitなりなんなりして、データを数え上げます。
ここでは自前で$counterを設けていますが、どうせreducerに渡される前後でも同じようなことは処理されるので、これはなくてもいいです。メモリの無駄かも。その場合は単純に print $_,"\t1\n"; としておけばいいでしょう。
なお、きわめてシンプルなスクリプトを例示してしまいましたが、実際にはクロールやDB問い合わせ、エンコード/デコードなどなど、イロイロな処理をすることになるでしょう。
また現実的にはシンプルなkey => valueではなくて、多段のハッシュ構造などでデータを渡したいことが多いでしょう。その場合は key の部分に適当な文字列をセパレータにして複数のデータ項目を格納してしまえばよいと思います。reducerで分解すればよいので。
というわけで、おつぎはreduce.pl
#!/usr/local/bin/perl use strict; use warnings; my $counter; while(<STDIN>){ chomp $_; my ($key ,$value) = split "\t"; next if !($key && $value); $counter->{$key} += $value; } for( sort { $counter->{$a} <=> $counter->{$b} } keys %$counter ){ print $_,"\t", $counter->{$_}, "\n"; }
見てわかる通り、reducerもシンプルです。もう好きなように書けばいいです。
ちなみに、naoyaさんが2年くらい前にhadoop streamingの記事を書いています。
その中でイテレータつかったより効率的なフレームワークを作られているので、本格的に導入する際にはこれを使うといいでしょう。
mapperとreducerを準備したので実際に処理を走らせてみます。
マスターノードで以下のコマンドを叩きます。
[miki@host001 hadoop]$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input /user/miki/input -output output -mapper /home/miki/hadoop/perl_script/map.pl -reducer /home/miki/hadoop/perl_script/reduce.pl
hadoop-streamingを実行してくれるjarファイルはcontribの下にあるので、bin/hadoop jar で呼び出します。あとはhdfs上の -input と -outputの指定、それと -mapperと -reducerを指定します。
なお、mapperとreducerを絶対パスで直接指定していますが、そうする場合には事前に各スレーブ(host002からhost005まで)にperl_scriptをrsync しておく必要あります。またそんな面倒なことをしなくても、-file というオプションを指定することで任意のファイルを実行時にスレーブに転送することもできるようです。
map-reduceの処理が始まるとコンソール上に進捗状況が表示されます。
おわったらhdfs上の/user/ユーザ名/output/part-00000というファイルに結果が出力されるはずなので、 それをlinuxのファイルシステムに持ってきます。
[miki@host001 hadoop]$ bin/hadoop/fs get /user/ユーザ名/output/part-00000 /home/ユーザ名/output.txt
これで結果ファイルを取得できました。お疲れ〜。
まとめ
Hadoopの導入は意外と簡単でした。今回試してみた環境には古いサーバ(RedhatEL4)も混ざっていたんですが、Javaの導入も問題なかったしHadoopもちゃんと動いてくれました。
「map-reduceのメリットは最低でも20台くらいないとわからない」なんて言いますが、まずは4〜5台程度で試してみることをオススメします。
というのは、map-reduceのカッコいい側面だけではなく、まずは小さい規模でシステムを組んでみて、どこらへんにどれくらい時間や負荷がかかるのか、総合的なコスト感というものを把握すべきかと思います。
その上で本当にイケルと確信したならば数十台規模のシステムを構築すべきかな、と感じました。
規模が小さくて簡単な処理だったら、実は普通に集計スクリプトを書いて複数プロセスで動かした方が速い場合もあるしね。(少なくともhdfsへの大量データコピーはかなり時間がかかります!)
とはいえ、Hadoop面白かったです。そのうちHiveを試してみます。