Tech Sketch Bucket of Technical Chips by TIS Inc.

Infinispanの紹介(3) MapReduce

Pocket

3回にわたるInfinispanの紹介シリーズの最終回です。 初回 はInfinispanのキャッシュ機能の使い方を、 前回 はInfinispanをクラスタ構成で稼働させる方法を紹介しました。最後にInfinispanの特徴となるMapReduceでのアプリケーション実行の機能を紹介します。


MapReduceで実現できること

Infinispanは単なるキャッシュサーバの機能だけではなく、キャッシュされたデータに対するMapReduceアプリケーションの実行基盤として使用できます。MapReduceで何が実現できるかをInfinispanに付属するサンプルアプリケーションWordCountをクラスタ環境で実行させることで紹介します。
なお、Infinispanのバイナリモジュールおよびソースコードは ここから 入手します。今回はバージョン5.2.1.Finalを使用します。

WordCountの構成

WordCountでは、指定されたファイルをInfinispanのキャッシュに格納し、それに対してMap処理で単語分割を行い、Reduce処理で単語の出現数をカウントし、その結果の上位15件を出力します。
実行は、クラスタ内の1台のノードがMaster、他がSlaveの位置づけになり、MasterがMapReduce処理を制御します。

infini_3_mapred.jpg

このクラスタの構築方法の考えを示します。

  • WordCountで扱うファイルのサイズが小さければMaster1台だけを稼働させ(Slaveは稼働させない)、全てのMapReduce処理をMasterだけで行います。
  • ファイルのサイズが大きく、Master1台だけでは処理遅延・OutOfMemoryエラーの発生が懸念される場合は、Slaveを利用します。3台構成のクラスタの場合はファイルを3分割し、Master1台・Slave2台で分割したファイルに対するMap処理を行い、その結果をMasterに集約してReduce処理を行います。この際、Masterは稼働しているSlaveを自動で検知し、停止しているSlaveはMap処理実行の対象外となりますので、全Slaveを起動してから最後にMasterを起動します。

Masterノードの動作

Masterノードの起動コマンドを示します。オプションnodeTypeでMasterノードであることを指定します。

起動すると前処理として、オプションtextFileで指定されたファイルを対象に、ファイルを10kバイトずつ読み込み、その内容をInfinispanのキャッシュに(key:ファイル名+連番、value:読み込んだ内容)の形式で格納します。なお、InfinispanのクラスタはDistributedモードで、各データは1台のサーバだけで保持されますが、ファイルを読み込んだノードに必ず存在するのではなく、稼働ノードが増えるとクラスタ内でバランスを取るように再配置が行われます。
その後、稼働しているSlaveノードに対してMap処理をキックすると共に、自身が実際に保持しているデータを対象にMap処理(データを単語分割します)を実行します。Map処理が終了すると全てのMap処理の結果をマージの上Reduce処理(単語毎の出現数を集計)を実行します。その後Collator処理で単語の出現数順にソートし上位15件を抽出します。

Slaveノードの動作

Slaveノードの起動コマンドを示します。

起動すると前処理としてMasterノードと同様に、オプションtextFileで指定されたファイルの内容をInfinispanのキャッシュに格納します。
その後Slaveノードは常駐し、MasterノードがMapReduceの処理をキックするたびに、自身で実際に保持しているデータを対象にMap処理を実行し、その結果をMasterノードに返します。

WordCountの処理コードの確認

Infinispanのソースコード中のディレクトリ"demos/distexec/src/main/java/org/infinispan/demo/mapreduce"がWordCountのサンプルアプリケーションになります。この中のコードを見ていきます。

キャッシュの作成

Masterノード/Slaveノード共にInfinispanのキャッシュを作成します(WordCountDemo.javaのスーパークラスのdemos/distexec/src/main/java/org/infinispan/demo/Demo.javaのメソッドstartCacheです)。

ここでは環境設定がハードコーディングされており、キャッシュマネージャはDistributedモード(CacheMode.DIST_SYNC)、データの保持ノード数numOwnersは1と指定されています。

ファイルのキャッシュへの格納

Masterノード/Slaveノード共に起動オプションで指定されたファイルを読み込みキャッシュに格納します(WordCountDemo.javaのメソッドloadDataです)。

ここではファイルを10kバイトずつ読み込み、(key:ファイル名+連番、value:読み込んだ内容) の形式でキャッシュにデータを格納します。なお単純なサンプルですので10kの境界で単語が分断されることは無視しています。

Slaveノードの常駐化

Slaveノードはキャッシュの生成後LockSupport.parkにより常駐し、MasterノードからMapReduceの処理がキックされるのを待ちます。
MasterノードがMapReduceをキックすると、Infinispanの機能により、キャッシュのデータで自身が実際に保持しているデータだけを対象にMap処理を実行しその結果をMasterノードに返すことが、自動で行われます(コーディングは一切不要です)。

MasterノードでのMapReduceのキック

Masterノードは、MapReduceの処理を次のようにキックします(ソースはWordCountDemo.javaのメソッドrunになります)。

MapReduceTaskに対して、mappedWithの引数にMap処理のクラス、reducedWithにReduce処理のクラス、executeにCollator処理のクラスを指定することでMapReduceの処理が実行されます。
これによりSlaveノードにMap処理をキックするとともに自身でもMap処理を行い、その結果を受けてReduce,Collator処理が自動で実行されます。

注:MapReduceTaskのコンストラクタの引数がキャッシュの1つだけのため、Reduce処理を分散実行するかのフラグがFalseに設定されます。この引数に",true,false"を追加し3つにすると、Reduce処理もSlaveノードで分散実行されます。

Map処理の概要

Map処理のソースはWordCountMapper.javaです。コードを抜粋します(HadoopのMapReduceと同様です)。

valueにキャッシュデータの10k毎のファイルの内容が格納されていますので、スペース区切りで単語を抽出し、その内容をemitします。

Reduce処理の概要

Reduce処理のソースはWordCountReducer.javaです。コードを抜粋します(HadoopのMapReduceと同様です)。

keyに単語、iterに出現数が格納されていますので、出現数を集計します。

Collator処理の概要

Reduce結果の後処理(本例はソート処理および上位の抽出)が必要な場合に用います。Collator処理のソースはWordCountCollator.javaです。コードを抜粋します。

Reduceされた結果がMapで引き渡されますので、ソート処理および上位の抽出を行い、結果をListで返します。

MapReduceの内部実装

InfinispanのMapReduce処理をハンドリングするソースコードMapReduceTask.javaの内容を簡単に紹介します。

Masterノードの動き

WordCountのサンプルでMasterノードがMapReduceTaskのメソッドexecuteを実行した際のInfinispanの内部の動作を示します。

1)Map処理の実行
MapReduceTask.javaの321行目で"if(distributeReducePhase())"により、Reduce処理を分散実行するかを判断し、フラグがFalseのため350行目のメソッドexecuteMapPhaseWithLocalReductionを呼び出します。
この中では446行目の"rpc.getMembers()"によりクラスター内の全ノードの一覧を取得し、それぞれに対し454行目の"part.execute()"でMap処理を実行し、"futures.add(part)"でMap処理の結果を受け取ります。
このMap処理の実体はMapReduceManagerImpl.javaの157行目のメソッドmapです。この中ではまず167行目のメソッドfilterLocalPrimaryOwnerにより、キャッシュ中の全てのkeyの中からそのデータの実体を自ノードが保持しているものだけを抽出して処理対象とします。次にその処理対象のkeyに対するvalueを取得し、183行目の"mapper.map(key, value, collector)"で、実行したいMap処理のユーザロジック(本例ではWordCountMapper.javaのメソッドmap)を呼び出します。

2)Reduce処理の実行
Map処理の終了後にMapReduceTask.javaの477行目の"mergeResponse(mapPhasesResult, mapTaskPart.get())"で全てのMap結果をマージした上、491行目の"reducer.reduce(e.getKey(), e.getValue().iterator())"で、実行したいReduce処理のユーザロジック(本例ではWordCountReducer.javaのメソッドreduce)を呼び出します。

3)Collator処理の実行
メソッドexecuteMapPhaseWithLocalReductionの呼び出し後に、MapReduceTask.javaの620行目の"collator.collate(execute)"で、実行したいCollator処理のユーザロジック(本例ではWordCountCollator.javaのメソッドcollate)を呼び出します。

Slaveノードの動き

MasterノードからMapReduceの処理要求を受信すると、MapCombineCommand.javaの91行目のメソッドperformが呼び出されます。この中では、MasterノードのMap処理と同じMapReduceManagerImpl.javaの157行目のメソッドmapが呼ばれ、その処理結果をMasterノードに返します。

最後に

3回にわたり、Infinispanのキャッシュ機能、クラスタ構成、MapReduceでのアプリケーション実行を紹介してきました。
特徴としてはInfinispanのキャッシュ機能はCacheStore機能を用いることで、Javaアプリケーションで大量データを取り扱う場合でも、頻繁に使われるデータだけをメモリに保持することが可能となり、ヒープの使用削減、ひいてはOutOfMemoryエラーの防止が実現できます。
またMapReduce機能により単なるキャッシュではなく大量データの処理基盤としての可能性を持ちます。同一データを色んな観点で複数回処理する場合、Infinispanではキャッシュにデータが乗った状態でMapReduce処理を繰り返し実行できますので、Hadoopの場合のように毎回HDFSからデータを読み込むのに対し、より効率的に動作すると考えます。
是非いろんな局面での採用を考えてみてください。

エンジニア採用中!私たちと一緒に働いてみませんか?