工場長のブログ

日々思ったことを書いてます。

DynamoDBを並列処理のためのロックやバッチのチェックポイントに使う

AWS Advent Calander 12/19分。
Kinesis用のFluentdプラグインを書いているので、それについてブログ書きたかったんだけどまだちゃんと公開できるような状態になっていないので、それは冬休みの宿題ということで。

CloudFrontのログを集計して可視化するツールを実装する機会があったので、その仕組みの中で使ったDynamoDBの話や、反省点などを書いてみる。ちなみにs3statというサービスを利用することも考えたんだけど、量的に全然処理できなそう/お金払えばできるけど高そうなので自分で実装することに。

あと、そんなんhive使えばよくね?って話は仰るとおり。もろもろの事情で謎のpythonスクリプトを引き継いでやらなければならなかったのです。

つくったもの

こんな感じの、CloudFrontから出力されるログを最終的に可視化されるまでのパイプライン処理を実装した。
f:id:imai-factory:20131220001335p:plain

CloudFrontのログの形式

まず、CloudFrontのログは指定したS3バケットに対して下記のような命名規則で集約される。

{ディストリビューション名}_YYYY-MM-DD-HH.{UUID的ななにか}.gz

中身はタブ区切りで date, time, x-edge-location, sc-bytes, c-ip, cs-method, .... という感じに、いわゆる普通のアクセスログにキャッシュサービスっぽい項目を足したもの。中にはx-egde-result-typeという項目もあって、これでキャッシュがヒットしたかどうかを追うことができるようになっている。タブ区切りなので非常に扱いやすい。

CloudFrontログのめんどくさいところ

一行一行はタブ区切りなので非常に扱いやすいんだけど、苦労した点がひとつ。エッジロケーションが世界中に沢山あって、それぞれのロケーションの中にエッジサーバーがいっぱいいて、それらがたくさんのファイルを生み出すからということだと思うんだけど、小さなログファイルがたくさん生成されている!今回扱ったディストリビューションでは、1日分で約2万から3万ちかいファイル!しかもひとつひとつは数百キロバイトから数メガバイトなので、HadoopやRedshiftが扱うには小さすぎる。これをある程度のサイズまで集約してあげないと集計処理が難しいわけです。

そこで下の絵のような感じでEC2で一旦S3からデータを取り出して500ファイルずつひとまとめにしつつ軽く一次集計して別のバケットに出力することに。今回はリクエスト数、データ転送量、エッジロケーションごとのリクエスト数くらいしか必要がなかったので、無駄なデータの排除もここでやることに。(掃除&集計しちゃうので結局ファイルサイズは大きくならないんだけど、ファイル数は劇的に減っているのでまあいいかなと。それにファイルサイズを大きくするのではなくて処理を効率化することが目的なので)
f:id:imai-factory:20131219232242p:plain

ファイルの集約&一次集計を並列化する

逐次生成されてくるファイルを15分や1時間ごとに端から処理していく、ということであればこれでOKなんだけど、今回はまず2〜3週間まえから発生しているデータを先に取り込んで、それから逐次処理をしていく必要があった。最初に取り込まなきゃいけないデータが大量すぎてシングルスレッドだと現実的な時間じゃおわらないので並列化することに。S3のオブジェクト名をキーにDynamoDBにロックテーブルを作ることで並列化を実現。
f:id:imai-factory:20131219235915p:plain
こんな感じにDynamoDBのレコードをロックファイルのように使うことでS3上の同じファイルを重複処理してしまうのを防ぐ。こうすれば大量のプロセスを立ち上げて同時に処理を始めても重複処理を避けられる。DynamoDBにはConditional Writeという機能があって、例えば「指定したキーのレコードが存在しなければ書き込む」というようなことができるので実装は非常に楽。書き込みリクエストが失敗したらロックされていた、成功したら自分がロックできた、というような分岐を書けばOK。テーブルの構造はこの絵のとおり、アトリビュートを何も持たない、ハッシュキーだけのテーブル。

一次集計済みのデータをRedshiftに突っ込む

次にデータをRedshiftに投入する。Redshiftへの重複ロードを防ぐところでも同じようにDynamoDBをロックテーブルとして使った。やってることは同じなんだけど、こっちは意味合い的には「これはやった」とか「ここまでやったよ」というようなチェックポイント的な意味合いで使ってます。こんな感じ。
f:id:imai-factory:20131219235907p:plain

JaspersoftでRedshiftのデータをレポート化する

ここはうまい絵がないのであれなんだけど、RedshiftとJaspersoftの体験版を組み合わせてみた。非常に簡単に設定できていい感じだった。このあたりはまた今後ちゃんと絵を揃えから書くお。

うまくいったこと

バッチ処理の冪等性の確保。もともと「なんど流しても副作用が(できるだけ)発生しない」処理にしたいというところからDynamoDBの利用を決めた。これがあるおかげで、何度流しても重複処理がされない。おかげで何かしらの問題が起きた時の再処理というか続きから処理をするという実装が非常にやりやすくなった。

反省点

Data Pipeline使えばよかった。いまこのパイプラインは1時間に1回のcronで動いている。あまりにも時間がなくてData Pipelineを覚えながらやる余裕がなかったのでこんな感じになってしまったけど、あれを使うと、スケジュール管理を外部化できるというのが大きい。1時間に1回EC2を起動してリポジトリからスクリプトを落としてきて実行する。そしてそのスケジュール管理はData Pipeline、みたいな感じにできると、非常に処理の可視性というかコントロール性が高くなっていいんじゃないかなぁと思った。


以上。