Tech Sketch Bucket of Technical Chips by TIS Inc.

Hadoop MapReduce、1ジョブで全体ソートする方法を考えた

Pocket

Hadoop MapReduceは、プログラマが何もしなくても処理結果をkey順にソートしてくれます。
ところが!処理結果全体はkey順になりません。

と、訳が分からないことを言いました。がんばって説明します。

Hadoop MapReduceは並列分散処理フレームワークなので、処理はふつう複数のタスクとして実行されます。処理結果はタスクごとに出力され、それぞれの中はkey順なのですが、タスクに割り当てられるkeyがマチマチなので全タスクの出力をつなぐとkeyが前後してしまいます。このように:

map-reduce.JPG

Hadoop MapReduceではタスクごとのソートを「部分ソート」、処理結果全体のソートを「全体ソート」といいますが、全体ソートの実現には一手間かかります。どんなやり方がいいか考えてみました。


方法1 reduceタスク数を1にする

全てのkeyが1タスクに処理され、その出力はkey順になります。(ちなみに今のところreduceタスク数のデフォルトは1です)
しかしこれは並列処理じゃない(Hadoopを使う意味がない!)。当然データ量に比例して時間がかかってしまいます。

方法2 PigやHiveを使う

Pigのorder byはスケーラブルに全体ソートを行えるようです。これが最適解かもしれません。
HiveのSORT BY、ORDER BYで全体ソートするには方法1と同じくreduceタスクを1にするしかないようです。

方法3 Partitionerを自作する

初期状態のHadoopでは org.apache.hadoop.mapred.lib.HashPartitioner.getPartition(key, value, numReduceTasks) が、中間データのkeyをそのハッシュ値に応じてreduceタスクに振り分けます。
getPartition メソッドを持つ Partitioner を実装すれば振り分けルールを自由に定義できます。
たとえば、このようなルールを実装したら

(reduceタスク数が3の場合)key~10000はpartition1、~20000はpartition2、残りがpartition3に振り分けられた結果、全体でもkey順になります。
しかし、入力データによっては各partitionのサイズがひどく偏るかもしれません(ほとんどのkeyが20000より大きいなど)。

方法4 TotalOrderPartitionerを使う

方法3の欠点を克服できる TotalOrderPartitioner がHadoopにもとからあり、入力データからkeyをサンプリングしてパート境界値を決めてくれます。
Pigでなければこれが定番と思いますが、入力データがSequenceFile限定なのがネックです。

SequenceFileはHadoop固有のファイル形式で、keyのメタ情報(型や長さ)をファイル内に含むもの(keyをサンプリングする以上、メタ情報が必要なわけです)。
しかし入力データが最初からSequenceFileで作られているケースをのぞいて、「入力データからSequenceFileを作るジョブ」と「SequenceFileをソートするジョブ」の2ジョブが必要で、その分時間がかかりそう。

方法5 入力データに合わせたInputFormatとRecordReaderを自作する

そこでこの方法を考えました(やっと本題)。

方法4のkeyサンプルによる動的パーティショニングを活かしつつ、1ジョブで全体ソートする方法です。

keyのサンプルは、 InputFormat.getRecordReader().next(key, value) によって取得されます。つまり Mapper.map(key, value) に渡されるものと同じ。
入力データがテキストファイルの場合、デフォルトのRecordReaderであるLineRecordReader が返すkeyは各レコードの開始位置(ファイル先頭からのバイト数)です。
このクラスに代えて、ソートしたいキーを入力データから取り出すRecordReaderと、それを返すInputFormatを実装します。

あとは

とJobConfに指定する以外は『Hadoop』のTotalOrderPartitionerを使った全体ソート同様。
CSVや固定長データ用に、キー位置を指定できる汎用InputFormatを作ってもいいかもしれません。
この方法のネックは

  • 全体ソート・キーとは異なる値を、Mapper.map(key, value)で使いたい。
  • 全体ソート・キーをinputファイルから得るのが難しい(MapReduce処理内で複雑な計算の結果導出される等)。
  • Reporterを使えない

でしょうか。

それってTeraSort、、、

作ってから気づいたのですが、上のコードはHadoopのexamplesに含まれるTeraSortと同じ発想でした。

(http://hadoop.apache.org/common/docs/r1.0.0/api/index.html)

Terasortは標準的なMapReduceによるソートですが、 Partitionerは独特で、N-1個のソートされたサンプル・キーリストを使って各reduceが処理すべきキー範囲を決定します。 具体的には、sample[i-1] <= key < sample[i] であるkeyはreduce i に送られます。これによって、reduce i の出力はすべて reduce i+1 のより小さいことが保証されます。

TeraSortではさらに

高速にパーティショニングするために、Partitionerは2層のトライ木を構築しており、keyの先頭2バイトによって速やかにサンプル・キーと対応させられるようにしています。

なるほど、、、。(トライ木 )

まとめ

長々と書きましたが

(『Hadoop』Tom White)
全体としてソートされたファイルを生成する簡単な方法はありません。多くのアプリケーションにとって、それは問題になりません。

だそうで実際役に立つことは少ないのかもしれませんが、ソートひとつとっても、このように色々な方法があり、データ量や内容、マシン・リソース、性能要求、開発コストなどに応じて選択できるのは、Hadoopの めんどくさ 面白さだなぁと思いました。

お礼

この記事はDevLOVEによる 黄色い象使いが、獄長に出会うまで。~象、邂逅編~ をきっかけに書かれました。
講師の n3104 さん、DevLOVE、ありがとうございました。

Hadoop, Hadoop MapReduce, Pig, Hive are trademarks of The Apache Software Foundation.

エンジニア採用中!私たちと一緒に働いてみませんか?