Tech Sketch Bucket of Technical Chips by TIS Inc.

リアルタイム分散処理Stormの耐障害性は?

Pocket

techsketch-banner-OSS+startingblock(700x65).jpg

リアルタイム分散処理とは

「ビッグデータ」処理のためにHadoopを用いますと、「複数のマシンに大量データ処理を分散して飛躍的に性能を向上する」ことが容易に可能となります。
ところがHadoopの弱点としまして、ビッグデータをいったん蓄積し、バッチで一括処理する形態で処理が行われますので、処理データが発生してからそれに対する処理結果が得られるまで必ずタイムラグが発生します。このため、クレジットカードの不正アクセス検知、センサーデータなどでの異常値検出のようなリアルタイムなレスポンス(低レイテンシー)が要求されるビッグデータ分野へのHadoopの適用は向いておりません。
このような随時発生する大量データ(ストリーミングデータ)を、蓄積せずにリアルタイムに処理する「リアルタイム分散処理」が求められています。
今回は、リアルタイム分散処理のソリューションとしてTwitter社より公開された Storm に着目し、サンプルアプリケーションでは実装されていないデータが正常に処理されたかを判断する機能等の、Stormの耐障害性を中心に紹介していきます。

アーキテクチャ

Stormで実行されるアプリケーションの構成、およびクラスタ環境で実行させる際のプロセスの構成を示します。
blog_storm_00.jpg

アプリケーションの構成(Topology)

Stormで実行されるアプリケーションは Topology という構成をとり、次の要素を持ちます。

  • Tuple :Storm内で処理されるストリーミングデータで、アプリケーションが授受するデータはTuple内にList形式で格納されます(List内の各項目( フィールド と呼びます)にはその内容を表す名称をつけます)。
  • Spout :ストリーミングデータの送信元で、メソッドnextTupleでTupleを一件ずつ送信します。
  • Bolt :Spout,他のBoltから送信されたTupleをメソッドexecuteで受信し、処理します。処理結果は必要に応じて後続のBoltにTupleとして送信します。
  • Topology :Tupleが処理され流れていくSpout,Boltのグラフ構造です。このTopologyの定義は、プログラム内でTupleを授受する前後関係でハードコーディングします。Boltの定義例を示します。

builder.setBolt(" count ", new WordCount (), 12 ). fieldsGrouping (" split ", new Fields(" word "));

... WordCountというBoltが12スレッド稼働し、"split"と名付けられたTupleを受信します。この際に、Tuple内の"word"と名称をつけたフィールドの値が同じものは同じスレッドに引き渡されます(fieldsGroupingの指定により)。処理結果は"count"と名付けたTupleで送信します。

プロセス構成(distributed mode)

Topologyを実行する環境は、単一のJavaアプリケーションとして実行する local mode と、 クラスタ環境で並列分散処理する distributed mode が存在します。
distributed modeでは全体をマスターノードの Nimbus で、個々のワーカーノードを Supervisor で管理します(この連携は Zookeeper を介して行われます)。Nimbusに対してアプリケーションを起動しますと、Supervisorがアプリケーションを実行する Worker を起動します。Workerの起動数はアプリケーション内で定義でき、Topologyの定義で指定したスレッド数がWorkerに分配されて Task として実行されます。
アプリケーションの稼働状況はマスターノードで UI を稼働させることで、ブラウザーからモニタリングできます。

アプリケーションの実例と稼動

Stormのサンプルアプリケーション storm-starter の"src/jvm/storm/starter/WordCountTopology.java"をもとに、アプリケーションの実例(Java言語あるいはClojure言語で記述)と稼働方法の概略を示します。

Topologyの概略

WordCountのTopologyの概略を示します。RandomSentenceSpoutというSpoutで文章をランダムに送信し(shuffleの指定で)、SplitSentenceという1つ目のBoltで単語分割して単語毎に送信先を決めて送信し(fieldの指定で)、WordCountという2つ目のBoltで単語毎に出現数をカウントします。
blog_storm_01.jpg

特徴は次の様になります。

  • RandomSentenceSpoutのメソッドnextTupleは何らかのイベントにより起動されるのではなく、繰り返しでずっと呼ばれ続けます。そのため、負荷を高めない目的でSleep処理が入っています。一般的には、必ずしも送信データが存在する訳ではありませんので、送信するデータが無い時はこのメソッド中でデータの発生を待つのではなく、Sleep処理を行った後に何もせず抜け出すようにします。
  • SplitSentenceのBoltはJava言語からPython言語のスクリプトを呼び出すことで実行されます。これは、Boltのクラスをbacktype.storm.task.ShellBoltを継承して作成することにより、外部のシェルスクリプトに処理を委ねることで実現されます。他にRuby言語の呼び出しが可能です。
  • WordCountはTaskが12稼働し、1つのTaskには複数の種類の単語が入力されますが、同じ単語は必ず同じTaskに入力されます。よって入力されたTupleの単語は、HashMapで(単語、単語数)の形態で保持すれば、その単語毎の総数が正しく把握できます。

local modeでの稼動

アプリケーションをコンパイルし、local modeで稼働させるには次の手順を踏みます。

1. アプリケーションのコンパイルにはClojure言語の開発環境の leiningen が必要です。wgetコマンドでleinコマンド(Bashのスクリプトです)を入手し、環境変数PATHに通し、leinコマンドでleiningenの環境の初期化を行います。

wget https://raw.github.com/technomancy/leiningen/stable/bin/lein
lein self-install

2. gitコマンドでサンプルアプリケーションstorm-starterを入手します。

git clone https://github.com/nathanmarz/storm-starter.git

3. leinコマンドでstorm-starterをコンパイルし、storm-starterのJARファイルを作成します。

lein deps
lein compile

4. Topologyを実行します。

java -cp `lein classpath` storm.starter.WordCountTopology

distributed modeでの稼働

1. distributed mode で稼働させるための環境準備(ライブラリのインストール)はここでは省略します。その手順は マニュアル を参照ください。また、 Zookeeper の稼働も必要です。

2. Stormのバイナリーディストリビューション より最新のバージョン(執筆時はstorm-0.7.1.zip)を入手し、解凍します。

3. 環境定義ファイル"conf/storm.yaml"を作成します(ホームディレクトリ配下の"~/.storm/storm.yaml"にも同じものが必要です)。設定する主な項目は次の通りです。

java.library.path: ライブラリのインストールディレクトリ
storm.zookeeper.servers:
- ZookeeperのIPアドレス(稼働させる台数分指定する)
storm.zookeeper.port: Zookeeperのポート番号
nimbus.host: NimbusのIPアドレス
nimbus.thrift.port: Nimbusのポート番号
supervisor.slots.ports:
- Workerのポート番号(稼働させる台数分指定する)
ui.port: UIのポート番号

補足: YAML というデータ形式で記述する為、"稼働させる台数分指定する"箇所は、1行ずつ先頭に"-"(ハイフン)を付けて羅列します。

4. マスターノードでNimbus, UIを起動します。psコマンドで見ると"java backtype.storm.daemon.nimbus", "java backtype.storm.ui.core"が稼働しています。

bin/storm nimbus
bin/storm ui

5. 各ワーカーノードでSupervisorを起動します。psコマンドで見ると"java backtype.storm.daemon.supervisor"が稼働しています。

bin/storm supervisor

6. Nimbusに対してアプリケーションを実行します(Jobの稼働)。

bin/storm jar "storm-starterのJARファイル" storm.starter.WordCountTopology "Job名称"

NimbusはSupervisorにアプリケーションを実行するWorkerの起動を指示します。各ワーカーノードでpsコマンドで見ると"java backtype.storm.daemon.worker"が稼働し、その上でアプリケーションが実行されます。

Stormが実現する障害対策

データが正常に処理されたかの判断

Topologyが実行される中では、Boltの処理中にDBアクセスエラー等で処理が継続できないといった障害が起こりえますので、TupleがTopologyを全て流れたかを判断することが必要となります。Stormの紹介記事では"Guarantees no data loss"という言葉が使われ、一見Storm自身がTopologyに流したTupleを確実に処理することを保証する(アプリケーションでは何も気にしなくてよい)ように受け取れますが、Stormで実現される機能はTopology内の処理がすべて正常に終了したかそうでないかを検出することです。このためには、アプリケーション内で、

  • SpoutでTuple送信(emit)時にユーニークに識別できるIdを付与する
  • Boltが後続のBoltにTupleをemitする際に受信Tupleを引数に付けることで、Tuple間の関連づけ( anchoring )を行う

という実装を行い、Topoloyの中をTupleが流れていく状況(Spoutから見たツリー構造)をStormが監視できる様にし、さらに

  • Boltは処理結果に応じて、正常終了時は Ack , 障害発生時は Fail の送信を行う

という実装を行います。
これにより、Stormがツリー構造でAck,Failが返された状況をモニターリングし(Stormの内部で"__acker"というBoltが自動で稼働してこれを処理します)、全てAckであればSpoutのメソッドackがemit時に付与したIdを引数に呼び出され、Failの発生or一定時間内に処理が完結しなければSpoutのメソッドfailがemit時に付与したIdを引数に呼び出され、TupleがTopologyで全て正常に処理されたか否かを検出できます。

blog_storm_02.jpg

なお、Spoutのメソッドfailが呼ばれた場合にメッセージを再送信するかはアプリケーションの要件次第で決めます。これには、送信したTupleをどのように保持するか、および、どのBoltで処理が正常に実行されたか未処理であったかの情報がStormとしては保持していないためTupleが二重実行されても大丈夫かの判断が必要です。

補足:Ackの状況を保持するため、メモリの使用量は増加します。処理に確実な信頼性が求められない場合は、SpoutのTuple送信時にIdを付与しない(コーディング上の対策)、あるいは デフォルト設定

を0にすることによりStormが持っているAckの機能を無効にする(上述のコーディングが行われていても)ことを検討します。

Workerプロセスダウン時のリカバリー

Stormの可用性向上対策の一例として、Workerプロセスがダウンした時のリカバリーの振る舞いの実例を示します。この際に用いたアプリケーションのTopologyを、前述のUIの機能により示します。
blog_storm_04.jpg

これより、このTopologyは、

  • Tupleを送信する"symbol"というSpoutは、worker:1のtask:4として稼働
  • Tupleを受信する"received"というBolt(2スレッド稼働で定義)は、worker:1のtask:2と,worker:2のtask:3として稼働
  • 上述のAck機能を実現する為の"__acker"というBoltは、worker:2のtask:1として稼働

という構成で稼働していることが把握できます。

では、このTopology実行中にworker:2を強制終了させた場合に、SupervisorがWorkerをどのように復旧するか、その間のTupleがどう処理されるか、を示します。
blog_storm_05.jpg

1. Spoutが稼働していないWorkerプロセスを強制終了させます。
2. Workerプロセスダウン後30秒経過すると、Supervisorのログに

と出力され、SupervisorとWorkerとのハートビートが失われたことが検知されます。なお、この30秒は デフォルト設定

によります。

3. SupervisorではWorkerプロセスを念のためkillする処理が動きます。

4. SupervisorはWorkerの再起動を行います。

再起動中に、強制終了時にWorkerで受信できていたがメソッドexecuteで未処理であったTuple、および未送信のAck、の処理が実行されます。

5. Worker停止時のSpoutの振る舞い
・task:4→task:3の処理
Workerが停止している間も、そこで稼働していたはずのBoltに対するTupleの送信処理は継続されますが(Tupleのemit時のリターンコードがどのTaskに送信したかのTaskのId値を返すことから判断できます)、Tupleの受信処理が無くAckが返されることも無いので、タイムアウトの30秒後にSpoutのメソッドfailが呼ばれ、Tupleが正常処理されなかったことを検出できます。
・task:4→task:2の処理
稼働しているWorker内で完結する処理はアプリケーションのロジックとしては正常終了ですが、Ackを処理するTaskが無いためこちらもタイムアウト後にSpoutのメソッドfailが呼ばれることになります。
なお、この30秒は デフォルト設定

によります。

補足1:本ケースはTupleを1秒間隔で送信した場合の振る舞いです。2秒間隔にすると、強制終了のタイミングで正常に稼働しているWorkerもSupervisorがダウンと判断して再起動が行われ、挙動が異なる結果が得られています。
補足2:Ackを処理する"__acker"のTask数は デフォルト設定 で、"topology.ackers: 1"となっています。Ackの発生数、耐障害性を踏まえたTask数のチューニングが必要です。
補足3:ワーカーノードがダウンした場合は、Nimbusがそれを検知し、他に使用できるワーカーノードが存在すればそこでWorkerを再起動させます。またNimbus, Supervisorsプロセスダウン時は、Workerプロセスの処理はそのまま継続でき、ダウンしたプロセスは単純に手作業で再起動します。

トランザクション機能

Stormのバージョン0.7.0より、 トランザクション という機能が新たに導入されました。これは上述の"データが正常に処理されたかの判断"の機能において、Spoutのメソッドfailが呼ばれたらStorm内で自動的にTupleの再送信を行い、かつBoltの処理で二重実行が行われないことを保証しようとするものです。このためにはアプリケーションのBoltのメソッドfinishBatchで、Stormで採番されたトランザクションIdを意識してTupleを処理するかの判断の作り込みが必要です。
blog_storm_03.jpg

現在、トランザクションは開発途上であり、対応したSpoutはサンプルで1つ提供されるだけで、送信する全てのTupleをSpoutのコンストラクターに与えるバッチ処理的なデータ入力の場合のみ適用可能です。

最後に

StormはHadoopと比較しますと、データをオンメモリーベースで1件ずつ処理するため、低レイテンシーでの処理が期待できます。また、耐障害性の仕掛けも用意されていますので、ミッションクリティカルな分野への適用も可能です。リアルタイムな処理が要求される場合はStormの使用をぜひ検討してみてください。

エンジニア採用中!私たちと一緒に働いてみませんか?