Tech Sketch Bucket of Technical Chips by TIS Inc.

RubyからAmazon Kinesisを操作する

Pocket

昨年11月、米ラスベガスで開催されたカンファレンス「AWS re:Invent 2013」で、ストリーミングデータをリアルタイムに処理するサービスである「Amazon Kinesis」が発表されました。ストリーミングデータをリアルタイムに処理...と、なにやら難しそうな感じがしますが、API、SDKが公開されているので簡単に試すことができます。今回はRuby向けAWS SDKを利用し、Amazon Kinesisを操作してみます。


Amazon Kinesisとは

Amazon Kinesisは、Amazon Web Servicesが提供するストリーミングデータをリアルタイムに処理する為のサービスです。近年ビッグデータの処理や分析が注目されてきましたが、バッチ処理が主流でした。Amazon Kinesisを利用すると、リアルタイムにストリーミングデータの処理や分析が可能です。また、Amazon S3やAmazon DynamoDB、Amazon Redshiftなどのサービスとの連携が容易な点も特徴として挙げられます。

詳細は以下のページをご参照ください。

本記事では、Amazon KinesisをRuby向けAmazon SDKを利用して操作する方法を紹介します。

Kinesis Management Consoleから ストリームを作成する

まずはWeb画面からストリームを作成してみます。Kinesisストリームは、データのキャプチャ・格納・転送を行います。
AWS Managemant Consoleのサービスの一覧からKinesisを選択します。

aws_list.png

次にCreate Streamを押し、ストリームを作成していきます。Number of Shardsという設定項目がありますが、これはどれだけ分割して処理を行うかを指します。まずはサンプルということで、シャード数は1に設定しました。

create_stream_console.png

Createボタンを押すとストリームが作成されます。ストリーム名とシャード数の入力と、数回ボタンを押すだけでストリームが作れてしまいました。

create_stream.png

ストリームは以下の表の通り、1シャードにつき読み込みは5トランザクション/秒 (2MB/秒 まで)、書き込みは1000トランザクション/秒(1MB/秒 まで)処理することができます。

shard_info.png

利用料金は、1 シャード 0.015 USD/1h、0.028 USD/100万putです。※記事執筆時点

詳細は以下をご参照ください。

RubyからKinesisを操作する

ここからRubyを使ってKinesisを操作する方法を紹介していきます。

Ruby向けAWS SDKをインストール

まずは、RubyからKinesisを操作する為にAWS SDKのGemをインストールします。

動作確認した環境は以下の通りです。

  • Amazon Linux AMI(3.2.39-6.88.amzn1.x86_64)
  • Ruby 2.0.0p247
  • aws-sdk 1.37.0

AWS SDKを利用する際にAWSのアクセスキーと秘密鍵を設定する必要がありますので、各自取得してください。

また、サンプルとして掲載するコードでは、キー情報を環境変数から取得するようにしてあります。動作を確認する際は、環境変数にAWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYを追加して実行してください。

インストールしたAWS SDKのAPI仕様については、以下のドキュメントをご参照ください。

ストリームにデータを投入

まずは、先ほど作成したストリームにデータを投入してみます。put.rbを実行すると、標準入力で指定したストリームに対して、データとして現在時刻を1秒毎に追加していきます。パーティションキーは、ストリームにデータを投入する際にどのシャードに割り当てるかを決めるキーです。今回は、ECサイトのカテゴリを想定した値をランダムに設定しました。プログラムを実行すると、Ctrl-c などで強制終了するまで値を追加し続けるので、適当なタイミングで止めてください。

  • 利用したAWS::Kinesis::Clientのインスタンスメソッド

ストリームに対して、連続して現在時刻のデータが追加されていくのが確認できます。シャードは1つしか作成していないので、パーティションキーが異なっていても同一シャードに追加されていきます。

ストリームからデータを取得

次に、先ほど投入したデータを取得してみます。get.rbを実行すると、標準入力で指定したストリームの全てのシャードから並列でデータを取得し、表示します。(今回は1シャードなのでシングルスレッドで実行されます)

先ほど投入したデータが表示されました。

ストリームの作成

次にストリームをKinesis Management Consoleからではなく、プログラムを実行することで作成してみます。create.rbを実行すると、標準入力からストリーム名とシャード数を受け取り、ストリームを作成します。

example2という名前、シャード数は2でストリームを作成してみました。

Kinesis Management Consoleで新しいストリームが作成されていることが確認できます。ストリームのステータスは以下の4種類が有り、ストリームの作成直後はCREATING、作成が完了するとACTIVEにステータスが遷移します。

  • CREATING
  • ACTIVE
  • UPDATING
  • DELETING

create_stream2.png

指定した通り、シャード数2で作成されたことが確認出来ます。

create_stream3.png

ストリームの一覧を表示

WebからではなくCLIからストリームの一覧を確認できるように、ストリームの一覧を表示するプログラムを作成してみます。表示するデータは、ストリーム名、開いているシャード数、閉じているシャード数、ステータスです。

最初にKinesis Management Consoleから作成したストリームと、プログラムを実行して作成したストリームが表示されました。

ストリームの削除

作成したストリームをプログラムから削除してみます。delete.rbを実行すると、標準入力で削除する対象のストリーム名を聞かれ、そこで指定したストリームを削除します。

exampleという名前のストリームを削除しました。

しばらく待ってから一覧を表示してみます。

exampleが削除され、example2だけ残りました。

複数シャードへの分散を確認する

複数のシャードを持つストリームを作成し、データが分散されるか確認してみます。今回はシャードを3つで作成します。

次にデータを投入します。

別々のシャードにデータが分散されています。また、パーティションキーが同一であれば、同一のシャードで処理していることが確認できました。

シャードの結合

ストリームの複数のシャードを結合するコードを作成しました。トラフィックが減少した際など、シャード数を減らすのに利用します。

シャードが2以上あるストリームでないと結合できないので、example2で行います。シャードを結合する前に、データを投入しておきます。

次にシャードを結合します。merge.rbを実行し、対象のストリーム、結合するシャードを指定すると、シャードが結合されます。

シャードの結合、分割中はUPDATINGというステータスになるようです。

しばらく待ってから再度ストリームのリストを表示すると、開いているシャードが1に、閉じているシャードが2に変化しています。

データを取得してみると、閉じたシャードからでもデータが取れています。

これは以下に書かれている通り、24時間はデータの参照が可能だからです。また、追加のデータ投入は受け付けないようです。


Closed shards no longer accept data from producers. Data in these shards is still available to consumers, but only for 24 hours. Resharding operations result in closed shards.

closed_shard.png

本当に閉じたシャードにデータを追加できないか試してみます。

確かに、shardId-000000000002にしかデータが追加されません。

シャードの分割

シャードを分割するコードを作成しました。シャードの分割は、結合とは逆にトラフィックが増加した際などに行います。

先ほど結合したexample2のシャードを、split.rbを実行して分割してみます。分割位置は対象のシャードにマッピングされているハッシュ値の範囲で指定します。

分割中。分割前は開いているシャードは1つです。

分割が完了し、開いているシャードが2つに増えました。

再度split.rbを途中まで実行し、ハッシュ値を確認してみます。

指定したハッシュ値からシャードが分割されていることが確認できます。

まとめ

今回はAmazon KinesisをRubyのAWS SDKを利用して操作しました。実際になにかを作る際はDynamoDBにデータを保存してランキング集計したり、Stormと連携してなにかしらの処理を行ったり、Redshiftにデータを送り込んで分析したりするのが便利そうです。今回は基本的な使い方のみの紹介となってしまったので、今後もう少し応用的な使い方を紹介していけたらと思います。

エンジニア採用中!私たちと一緒に働いてみませんか?