いまさらやらないMapReduce

いまさらやらないMapReduce

2015.07.23

Hadoop システムの活用においてエコシステムの進化により、MapReduce プログラム直接作る人はいなくなっていると言えますが、その仕組みを理解していることは重要です。そんなあなたへのコラムです。

前置き

IT業界のトレンドである BigData ですが、多くは Hadoop とエコシステムを組み合わせて使用する事が多いようです。そのため、Hadoop の処理基盤である MapReduce プログラムを直接書くエンジニアは余り必要ありません。Hadoop 2系以降は、MapReduce 以外の仕組みも登場してきています。

しかし、Hadoop の仕組みを深く理解するためには、MapReduce が実際の所どのような処理を行っているのかを理解しなければいけません。主にパファーマンスのチューニングを行う場合に、どこの部分にボトルネックが発生しやすいのかを理解する事で、処理の組合せやデータの配置に注意する事ができます。

MapReuce プログラミングの入門で使用される WordCount (Apache Hadoop 2.7.0 : MapReduce Tutorial)プログラムを詳細に見てみます。

WordCount プログラム

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
  public static class TokenizerMapper extends Mapper{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException,
    InterruptedException {       
     StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
         word.set(itr.nextToken());
         context.write(word, one);
      }
    }
  }

  public static class IntSumReducer extends Reducer {
   private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values, Context context) 
    throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      
      result.set(sum);
      context.write(key, result);
    } 
  }
 
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

初めにプログラムの構造を確認します。

WordCount クラス(ワードをカウントするM/Rプログラム)には、2つの static インナークラスと main(String args[]) メソッドが定義されています。

TokenizerMapper クラス

Map 処理を行うクラスです。map() メソッドが定義されています。

IntSumReducer クラス

Reducer 処理を行うクラスです。reduce() メソッドが定義されています。

public static void main(String[] args) throws Exception メソッド

Java プログラムのスタートメソッドで、MapReduce のジョブを組み立てるコードを記述します

STEP 1:main メソッドの開始行より確認します

44行目 Configuration conf = new Configuration();

MapReduce の Config オブジェクト、追加の設定等を設定できる。
デフォルトは core-default.xml , core-site.xml を読み込む。

45行目 Job job = Job.getInstance(conf, “word count”);

44行目で作成した Config オブジェクトの設定をベースに word count ジョブ名(M/R) を組み立てます。

46行目 job.setJarByClass(WordCount.class);

M/R の実装クラスである WordCount クラスを指定します。
分散モードで、リモートホストへ送信する Java クラスを指定します。
(スタンドアローンモードだけならば指定しなくても良い)
このメソッドの働きにより、分散ノードに予め Java クラスを配置する必要がなくなります。

47行目 job.setMapperClass(TokenizerMapper.class);

ジョブの Mapper クラスを設定します。Mapper クラスの処理詳細は後ほど記述します。

48行目 job.setCombinerClass(IntSumReducer.class);

ジョブの Combiner クラスを設定します。
Combiner は Map 処理から Reducer 処理へ渡す前に、Map 処理でノード上のメモリが不足した時に、Reducer と同様の処理を1ノード内で行い処理データを纏めてレコード数を削減します。
(中間処理のため、常に実行されるとは限らないその為、必須ではない)

49行目 job.setReducerClass(IntSumReducer.class);

ジョブの Mapper クラスを設定します。Mapper クラスの処理詳細は後ほど記述します。

50行目 job.setOutputKeyClass(Text.class);

MapReduce (Key/Value) の出力キーの型を指定します。
指定するのは Hadoop の Writable を実装するクラス。(Writable は Hadoop のシリアライズ型)

51行目 job.setOutputValueClass(IntWritable.class);

MapReduce (Key/Value) の出力バリューの型を指定します。
指定するのは Hadoop の Writable を実装するクラス。(Writable は Hadoop のシリアライズ型)

52行目 FileInputFormat.addInputPath(job, new Path(args[0]));

HDFS 上の読み込むファイルパスを指定します。
FileInputFormat はキーとバリューに分けて、Mapper へ送り込みます。

53行目 FileOutputFormat.setOutputPath(job, new Path(args[1]));

HDFS 上へ出力するファイルパスを指定します。

54行目 System.exit(job.waitForCompletion(true) ? 0 : 1);

M/R ジョブの終了を待ち合わせします。
M/R ジョブが成功すると 0 失敗すると 1 が System.exit() メソッドの引数に与えられます。

STEP 2:TokenizerMapperクラス

org.apache.hadoop.mapreduce.Mapper クラスを継承して作成する。

Map 処理を行う、void map(Object key, Text value, Context context) メソッドを実装します。

[引数]
Object key -> WordCount の場合はオブジェクトキー各行のオブジェクトキー。
Text value -> WordCount の場合はテキストファイルの切り取られた1行を表す。
Context context -> 出力用の write(Key,Value) メソッドを持つコンテキストオブジェクト。
map() メソッドの処理

18行目 public void map(Object key, Text value, Context context)

map() メソッドの第2引数には、各ノードに分割されたテキストファイルから1行分の列を切り取り、Text 型の value として渡されます。

20行目 StringTokenizer itr = new StringTokenizer(value.toString());

StringTokenizer クラスを使い文字列を分解します。(デフォルトはスペース区切り)
value は Text 型のため、value.toString() を使い String 型へ変換が必要です。

21行目 while (itr.hasMoreTokens()) {…}

StringTokenizer で分解した文字列回数分、22−23行目を繰り返します

22行目 word.set(itr.nextToken());

itr.nextToken() メソッドで分解した文字列の1つを取り出して、String 型 から Text 型へ変換します。(※ Hadoop では出力に String 型での出力はできません)

23行目 context.write(word, one);

出力するキー(分解された文字列)とバリュー(1)を Map 処理の結果として書き出します。バリューの1は事前に定義された15行目の IntWritable 型の値です。Reduce 処理により最終的に数値(カウント)が纏められます。

STEP 3:IntSumReducerクラス

org.apache.hadoop.mapreduce.Reducer クラスを継承して作成する。

[引数]
Text key -> WordCount の場合は Map 処理で出力されるキー。
Iterable values -> WordCount の場合は Map 処理で出力されたキーに対するバリュのセット。(同じキーで纏められた値のセット)
Context context -> 出力用の write(Key,Value) メソッドを持つコンテキストオブジェクト。
reduce() メソッド

31行目 public void reduce(Text key, Iterable values, Context context)

reduce() メソッドの第1引数には、Map 処理で出力されたキー。
第2引数にはキーに対するバリューセット(キーが複数個ある場合値は複数)が渡されます。

33行目 int sum = 0;

キーに対するバリューの集計用に変数の準備

34行目 for (IntWritable val : values) {…}

35行目をバリューの数分繰り返します。(同じキー)

35行目 sum += val.get();

33 行目で準備した変数にカウントアップする。

38行目 result.set(sum);

35行目の集計結果を出力の型にセットする。

39行目 context.write(key, result);

集計した結果と集計に使用したキーを最終的に書き出します。

※Reducer の数は、Job クラスの setNumReduceTasks(int tasks) により数を指定する事ができます。Map からの振り分けルールは Partitioner で定義します。(デフォルトではキーのハッシュ値による振り分け)

MapReduce 処理ですが、解説した処理が複数のノードで実行される可能性がある事を考慮しなければいけません。Map 処理はスプリットされたファイルに対して処理が行われるため、スプリットされるファイルのサイズにより処理効率が変わります。スプリットサイズは、xml や API で指定ができます。

Reduce 処理では、解説の図にあるように Map 処理をした結果がネットワークを通じて1つ、または複数のノードへ転送されるためボトルネックになりやすい場所です。つまり Reduce 系の処理を多く行う場合はパフォーマンスとのトレードオフになります。パフォーマンスを考慮する場合は、Map 処理と Reduce 処理の組み合わせを考慮する必要があります。

以上 MapReduce 処理の紹介です。

記事は、予告なく変更または削除される場合があります。
記載された情報は、執筆・公開された時点のものであり、予告なく変更されている場合があります。
また、社名、製品名、サービス名などは、各社の商標または登録商標の場合があります。