MapReduceは処理対象のデータをキー/バリュー形式に分解するマップフェーズ、
分解されたキー/バリューのデータを取りまとめるリデュースフェーズから構成されます。
処理対象のデータはHadoopにより自動的に分割されます。
分割されたデータは複数のコンピュータに振り分けられ同時にマップフェーズの
処理が行われます。
マップフェーズ終了後、複数のコンピューターに存在する処理結果の
キー/バリューのデータはキーでまとめられ、キー/バリューリストとして
複数のコンピュータに振り分けられ同時にリデュースフェーズの処理が行われます。
MapReduceのロジックを複数存在する英文テキストファイルのワードカウンタ処理のソースをベースにして説明します。
ロジックの中心となるマップフェーズはMapperクラスから派生させたマップクラス、
リデュースフェーズはReducerから派生させたクラスリデュースクラスとして作成します。
ワードカウンタ処理を行うクラスWordCountを通常のJavaのクラスとして定義します。
public class WordCount {
このサンプルではWordCountクラスのインナークラスとしてマップクラスを定義しています。
Hadoopは処理対象のデータを引数valueにセットして、マップクラスのmapメソッドを呼び出します。
(この例ではテキストファイルの1行毎にmapメソッドが呼び出されます)
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 引数で渡された1行分のデータ
String line = value.toString();
// 文章を単語に分解する
String[] tests =line.split("\\s");
// 分割された全単語をマップクラスの結果として出力する
// 出力されるデータのキーは単語、バリューは1(単語が1つの意)です
for (String text :tests) {
word.set(text);
context.write(word, one);
}
}
}
このサンプルではWordCountクラスのインナークラスとしてリデュースクラスを定義しています。
Hadoopは引数keyに単語、引数valuesに値のリストをセットして、
リデュースクラスのreduceメソッドを呼び出します。
同一のキー(単語)を引数としてreduceメソッドが複数回呼ばれることはないので
値のリストの集計値がその単語の出現数となります。
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 格納されている値を全て集計します
for (IntWritable value : values) {
sum += value.get();
}
// 単語と集計値をリデュースクラスの結果として出力します
context.write(key, new IntWritable(sum));
}
}
プログラム実行用のmainメソッドです。
第一パラメータは処理対象の複数の英文テキストファイルが格納されているフォルダー、
第二パラメータは処理結果のファイルを出力するフォルダーとします。
ここでは、パラメータチェック及びHadoopのJobオブジェクトを作成し、Jobオブジェクトに
各種設定を行い、Jobオブジェクトを実行します。
public static void main(String[] args) throws Exception {
// パラメータチェック
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: hadoop ");
System.exit(2);
}
// Job作成
Job job = new Job(conf, "word count");
// Jobオブジェクトに自クラス、マップクラス、リデュースクラスを設定
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// Jobオブジェクトに入力ファイル、出力ファイル共にテキストファイルと設定
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// Jobオブジェクトにキーは文字列、バリューは数値(int)と設定
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Jobオブジェクトにパラメータで指定された入力ファイル格納フォルダー、
// 出力ファイル格納フォルダーを設定
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// Jobオブジェクトの実行を開始して完了するまで待つ
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
起動パラメータの二番目で指定されたフォルダーに以下の様な名称のファイルが
Hadoopにより作成されます。
・part-r-0000