工場長のブログ

日々思ったことを書いてます。

ほぼ日刊アドテクニュース 2015/1/6

AppNexus Acquires Cross-Device Technology Leader MediaGlu

AppNexus(the world’s largest independent ad tech companyと紹介されている)が、クロスデバイストラッキングサービスを提供するMediaGluをAppNexusが買収という記事。

どんな技術を使っているんだろうと思ってMediaGluのWebサイト見たけど、

OUR PROPRIETARY ALGORITHM CREATES ACCURATE USER-LEVEL MATCHES USING BILLIONS OF PROGRAMMATIC DATA POINTS AND ADVANCED MACHINE LEARNING.

だって。プロプライエタリ。まあそうだよね。

Through a simple desktop cookie or device ID sync, we can quickly identify mobile and desktop devices which match your 1st or 3rd party data.

となっているので、ユーザーのフットプリントをサーバーサイドに溜めておいて、 機械学習でマッチング して、CookieやデバイスIDをラベルにユーザーを横串さしてあげるのかと。Matching as a Serviceと書いてあるので、API経由で利用できたりしたのかも。お手軽でいいね!(もちろん有料なのだが)

AppNexusは自分たちがこのサービスを使いたいだけではなく、他のプレイヤーがお手軽にクロスデバイスターゲティングできないようにしたかった、というのもあるのかも。

Measure more: improving Estimated Total Conversions with store visit insights

In the coming weeks, we’ll be rolling out store visit measurement to eligible advertisers in the U.S. as the latest enhancement to ETC.

ということでAdWordsの新機能として「店舗への来店予測数をレポートする」ことができるようになるらしい。これを有効化する条件とs知恵

  • Have a Google My Business account linked to your AdWords account
  • Set up location extensions in your Google My Business account
  • Have multiple physical store locations in the US
  • Receive a large number of both ad clicks and store visits

とあるので、実績データをもとに、ユーザーの広告クリックなどからコンバージョン(来店)数を予測するという、大枠のしくみとしては比較的単純なことをやっている気がする。

でも、もとデータにもなる、コンバージョンである「来店の実績」ってどうやって取っているんだろう。

DynamoDBを並列処理のためのロックやバッチのチェックポイントに使う

AWS Advent Calander 12/19分。
Kinesis用のFluentdプラグインを書いているので、それについてブログ書きたかったんだけどまだちゃんと公開できるような状態になっていないので、それは冬休みの宿題ということで。

CloudFrontのログを集計して可視化するツールを実装する機会があったので、その仕組みの中で使ったDynamoDBの話や、反省点などを書いてみる。ちなみにs3statというサービスを利用することも考えたんだけど、量的に全然処理できなそう/お金払えばできるけど高そうなので自分で実装することに。

あと、そんなんhive使えばよくね?って話は仰るとおり。もろもろの事情で謎のpythonスクリプトを引き継いでやらなければならなかったのです。

つくったもの

こんな感じの、CloudFrontから出力されるログを最終的に可視化されるまでのパイプライン処理を実装した。
f:id:imai-factory:20131220001335p:plain

CloudFrontのログの形式

まず、CloudFrontのログは指定したS3バケットに対して下記のような命名規則で集約される。

{ディストリビューション名}_YYYY-MM-DD-HH.{UUID的ななにか}.gz

中身はタブ区切りで date, time, x-edge-location, sc-bytes, c-ip, cs-method, .... という感じに、いわゆる普通のアクセスログにキャッシュサービスっぽい項目を足したもの。中にはx-egde-result-typeという項目もあって、これでキャッシュがヒットしたかどうかを追うことができるようになっている。タブ区切りなので非常に扱いやすい。

CloudFrontログのめんどくさいところ

一行一行はタブ区切りなので非常に扱いやすいんだけど、苦労した点がひとつ。エッジロケーションが世界中に沢山あって、それぞれのロケーションの中にエッジサーバーがいっぱいいて、それらがたくさんのファイルを生み出すからということだと思うんだけど、小さなログファイルがたくさん生成されている!今回扱ったディストリビューションでは、1日分で約2万から3万ちかいファイル!しかもひとつひとつは数百キロバイトから数メガバイトなので、HadoopやRedshiftが扱うには小さすぎる。これをある程度のサイズまで集約してあげないと集計処理が難しいわけです。

そこで下の絵のような感じでEC2で一旦S3からデータを取り出して500ファイルずつひとまとめにしつつ軽く一次集計して別のバケットに出力することに。今回はリクエスト数、データ転送量、エッジロケーションごとのリクエスト数くらいしか必要がなかったので、無駄なデータの排除もここでやることに。(掃除&集計しちゃうので結局ファイルサイズは大きくならないんだけど、ファイル数は劇的に減っているのでまあいいかなと。それにファイルサイズを大きくするのではなくて処理を効率化することが目的なので)
f:id:imai-factory:20131219232242p:plain

ファイルの集約&一次集計を並列化する

逐次生成されてくるファイルを15分や1時間ごとに端から処理していく、ということであればこれでOKなんだけど、今回はまず2〜3週間まえから発生しているデータを先に取り込んで、それから逐次処理をしていく必要があった。最初に取り込まなきゃいけないデータが大量すぎてシングルスレッドだと現実的な時間じゃおわらないので並列化することに。S3のオブジェクト名をキーにDynamoDBにロックテーブルを作ることで並列化を実現。
f:id:imai-factory:20131219235915p:plain
こんな感じにDynamoDBのレコードをロックファイルのように使うことでS3上の同じファイルを重複処理してしまうのを防ぐ。こうすれば大量のプロセスを立ち上げて同時に処理を始めても重複処理を避けられる。DynamoDBにはConditional Writeという機能があって、例えば「指定したキーのレコードが存在しなければ書き込む」というようなことができるので実装は非常に楽。書き込みリクエストが失敗したらロックされていた、成功したら自分がロックできた、というような分岐を書けばOK。テーブルの構造はこの絵のとおり、アトリビュートを何も持たない、ハッシュキーだけのテーブル。

一次集計済みのデータをRedshiftに突っ込む

次にデータをRedshiftに投入する。Redshiftへの重複ロードを防ぐところでも同じようにDynamoDBをロックテーブルとして使った。やってることは同じなんだけど、こっちは意味合い的には「これはやった」とか「ここまでやったよ」というようなチェックポイント的な意味合いで使ってます。こんな感じ。
f:id:imai-factory:20131219235907p:plain

JaspersoftでRedshiftのデータをレポート化する

ここはうまい絵がないのであれなんだけど、RedshiftとJaspersoftの体験版を組み合わせてみた。非常に簡単に設定できていい感じだった。このあたりはまた今後ちゃんと絵を揃えから書くお。

うまくいったこと

バッチ処理の冪等性の確保。もともと「なんど流しても副作用が(できるだけ)発生しない」処理にしたいというところからDynamoDBの利用を決めた。これがあるおかげで、何度流しても重複処理がされない。おかげで何かしらの問題が起きた時の再処理というか続きから処理をするという実装が非常にやりやすくなった。

反省点

Data Pipeline使えばよかった。いまこのパイプラインは1時間に1回のcronで動いている。あまりにも時間がなくてData Pipelineを覚えながらやる余裕がなかったのでこんな感じになってしまったけど、あれを使うと、スケジュール管理を外部化できるというのが大きい。1時間に1回EC2を起動してリポジトリからスクリプトを落としてきて実行する。そしてそのスケジュール管理はData Pipeline、みたいな感じにできると、非常に処理の可視性というかコントロール性が高くなっていいんじゃないかなぁと思った。


以上。

ExcelでRedshift

ExcelってODBC扱えるよね、ということを思い出したのでExcelからRedshiftに接続してみたメモ。
あんまり難しいこと考えずにDWHをカジュアルに使おうぜ、という気持ち。基本的には下記のブログを参照してやってみた。

http://pgsqldeepdive.blogspot.jp/2013/04/excelpostgresqlodbc.html

ぼくの環境はWindows7(64bit) + Excel2010(32bit)

手順の概要

  • PostgreSQL用のODBCドライバのインストール
  • ODBCデータソースの設定
  • Excelで外部データソースに接続
  • ピボットテーブルのデータソースにRedshiftを使ってみる
続きを読む

AmazonのDynamoの論文を読んでみた(1/3)

Amazonが社内で開発し、サービスで利用しているDynamoというストレージサービスがあるのだけど、これについての論文が公開されていたので読んだのでまとめてみる。

この論文を書いたメンバーにはAmazonのCTOであるWerner Vogelsや、AWSでDynamoDBやElastiCache、SQS、SNSなどの製品のマネージメントをしているSwami Sivasubramanianらが含まれている。

Dynamoをひとことで表すと分散型でKey Valueストレージで、データの一貫性に関しては結果整合性を保証するサービスだ。なお、AWSで提供されているDynamoDBとは別物なので注意。

原文はこちらから参照できる。今回のポスト中の引用(図や文問わず)はすべてこちらから引用している。
また、既に日本語訳をされている方もいらっしゃるので原文をそのまま日本語でよみたい人はこちらを参照のこと!


論文は以下の7つのパートに分かれているのだが、結構長いので今回は3章まで読んでのまとめにする。
たぶん、全部で3回くらいになりそう。

1. Abstract(概要)
2. Background(経緯)
3. Related Work(参考にしたシステムやサービス)

4. System Architecture(アーキテクチャ)
5. Implementation(実装)
6. Experiences & Lessons Learned(開発や運用を経てのラーニング)
7. Conclusions(結論)

ここから今回のポストの本編。長くなるので先にまとめを書いておく。

先にまとめ

  • コンセプトをひとことでいうと100%書き込み可能でスケーラブルな分散型NoSQL。
  • DynamoはAmazonの一部のサービスで利用するために開発された。リレーショナルデータベースは使ってきたしこれからも使っていくが、「可用性が必要でデータへのアクセスがキーバリューで要件が満たせるところ」に利用されている。
  • Dynamoのゴールは"高い拡張性"、"高い可用性"、"低い運用負荷"の3点を満たすことにある。CAPの定理でいうと、A(可用性)とP(分断耐性)に重点を起き、C(一貫性)を諦めている。これにS(拡張性)を足したもの、というイメージ。ACID特性に関して言うとAtomicとIsolationは明示的に要件から除外されている
  • アーキテクチャとしては分散ハッシュテーブル型のP2Pネットワークの上に構築されており、攻撃などを想定せずにすむ、ひとつの閉じたネットワーク上に配置されることを前提としている。

ここから先は論文に沿って読みながら解説しながらという感じ。

続きを読む

YCSBでNoSQLのベンチマーク その2

YCSBを使ってみよう的なテーマで書いた前回の続きとして、今回は結果の読み方や負荷パターンの調整について書いてみる。

今回の目次は以下のとおり。

  1. コマンドのパラメータの読み方
  2. DBENGINEごとの接続設定
  3. workloadファイルの読み方/書き方
  4. 結果の読み方

コマンドのパラメータの読み方

基本的には起動は以下のようなイメージ。

./bin/ycsb DBENGINE (run|load) -P CONFIGFILE [ -threads n ] [ -p key=value ] [ -s ]

DBENGINEのところはdynamodbやらredisやらmongodbやらが入る。
その次はrunかloadが来る。これは動作モードを決定する値でrunはトランザクションフェーズ(読み込み&更新系)、loadはロードフェーズ(書き込み)の動作になる。
以下、パラメータのリスト。

パラメータ名 意味 デフォルト
P プロパティファイルをロードする。複数指定可能。 -
threads スレッド数 1
p 特定のプロパティを渡す。プロパティファイルの中の指定をオーバーライドできる -
s 実行中のステータスを表示する -

プロパティファイルというのが接続設定を書いたり負荷の設定を書いたりと、もろもろの設定を書くためのファイル。
ここからはプロパティファイル周りについて書いていく。

DBENGINEごとの接続設定

DBENGINEごとにいろいろ接続のための設定をしなければならない。例えばDynamoDBならAWSのCredentialやAPIエンドポイントだったり、redisだったらホスト名とポート番号だったりという具合。
じゃあ実際それぞれどこにどう書けばいいのかというと、これが統一的なドキュメントが無いw
それぞれのDBENGINEへの接続クラスのsorceディレクトリの中にREADMEがあることが多いので、これをもとに辿っていく。
例えばDynamoDBのREADMEを読むと、

$YCSB_HOME/bin/ycsb load dynamodb -P workloads/workloada -P dynamodb.properties

と起動しろと書いてあるので、このdynamodb.propertiesを読んでみる。

$ less dynamodb/conf/dynamodb.properties
dynamodb.awsCredentialsFile = PATH_TO_CREDENTIAL
dynamodb.primaryKey = id
dynamodb.endpoint = http://dynamodb.ap-northeast-1.amazonaws.com
requestdistribution = zipfian
#dynamodb.debug = false
#dynamodb.connectMax = 50
#dynamodb.consistentReads = false
#fieldcount = 10
#fieldlength = 90

こうみるとdynamodb.*といういくつかの特有なプロパティがあるのでこのあたりを設定してあげればつながることがわかる。


ちなみにこれは親切なほうで、redisの場合はREADMEがないw
じゃあどうすればいいかというと、sourceを読むw

$ less redis/src/main/java/com/yahoo/ycsb/db/RedisClient.java
(snip)
    public static final String HOST_PROPERTY = "redis.host";
    public static final String PORT_PROPERTY = "redis.port";
    public static final String PASSWORD_PROPERTY = "redis.password";

そうするとこんな感じに変数が見つかったりするので、じゃあこれを設定すればいいのだろうということでredis.confというファイルを作ってreds.hostやらredis.portやらを書いて、コマンド起動時に-Pで渡してあげたら動いた。

とまあこんな感じにundocumentedな部分も多いが、なんとかやっていきましょうw

workloadファイルの読み方/書き方

YCSBをインストールするとルートディレクトリの直下にworkloadというディレクトリが生成されていて、ここにworkload(a|b|c|d|e|f)というファイルがある。これがどんな負荷をかけるかを指定するworkloadファイル。

中身はこんな感じになっている。

$ cat workload/workloada

recordcount=1000
operationcount=1000
workload=com.yahoo.ycsb.workloads.CoreWorkload

readallfields=true

readproportion=0.5
updateproportion=0.5
scanproportion=0
insertproportion=0

requestdistribution=zipfian

上記にないものも含め、設定項目は以下のとおりです。

項目 意味
recordcount ロードフェーズで挿入するデータのレコード数
operationcount トランザクションフェーズで実行されるクエリの回数
workload workloadを実行するクラス。いじったことはない。
fieldcount 1レコードあたりのフィールド数
fieldlength 各フィールドのサイズ
readallfileds レコード内の全フィールドを読むのか(true)、1つだけにするか(false)
readproportion 全operationに対する読込の割合
updateproportion 全operationに対する更新の割合
insertproportion 全operationに対する挿入の割合
scanproportion 全operationに対するスキャンの割合
readmodifywriteproportion 全operationに対する読込、修正、更新の割合
requestdistribution リクエスト分布方式、uniform、zipfian、latestから選択
maxscanlength スキャンする際の最大レコード数
scanlengthdistribution スキャンするためのレコード数と各スキャンでのリクエスト分布方式
insertorder データ挿入する順序。キー順(ordered)かハッシュ順(hashed)

ちなみに、requestdistributionのそれぞれの値の意味はだいたいこんな感じ。

  • uniform
    • 均等分布
  • zipfian
    • ランダムに(?)偏る
  • latest
    • 新しいデータにアクセスが偏る

結果の読み方

最後は結果の読み方。下記はトランザクションフェーズの結果を適当に省略したもの。

[OVERALL], RunTime(ms), 185903.0
[OVERALL], Throughput(ops/sec), 537.8611426389031
[UPDATE], Operations, 50044
[UPDATE], AverageLatency(us), 21046.522180481177
[UPDATE], MinLatency(us), 6661
[UPDATE], MaxLatency(us), 255970
[UPDATE], 95thPercentileLatency(ms), 27
[UPDATE], 99thPercentileLatency(ms), 29
[UPDATE], Return=0, 50044
[UPDATE], 0, 0
[UPDATE], 1, 0
[UPDATE], 2, 0
[UPDATE], 3, 0
[READ], Operations, 49946
[READ], AverageLatency(us), 19815.733532214792
[READ], MinLatency(us), 4863
[READ], MaxLatency(us), 259948
[READ], 95thPercentileLatency(ms), 27
[READ], 99thPercentileLatency(ms), 28
[READ], Return=0, 49946
[READ], 0, 0
[READ], 1, 0
[READ], 2, 0
[READ], 3, 0
[READ], 4, 2

読み方的には、

  • 全体的(OVERALL)なスループットが 537.86.../sec
  • 更新(UPDATE)の平均/最小/最大レイテンシがそれぞれ21046us, 6661us, 255970us
  • 更新(UPDATE)クエリのうち95%は27ms以内、99%は29ms以内に終了
  • 参照(READ)の平均/最小/最大レイテンシがそれぞれ19815us, 4863us, 259948us
  • 参照(READ)クエリのうち95%は27ms以内、99%は29ms以内に終了

とか、だいたいそんな感じ。

まとめ

今回はYCSBを使って実際にデータを計測するためのチューニングというか設定方法を書いてみた。
NoSQLとか使うときはこういうツールを使って「実際に自分が必要とするだけの負荷をかけて試験してみようね」っていうお話でした。

YCSBでNoSQLのベンチマーク その1

ここ数年でNoSQLを実際のシステムで扱うのも割りと一般的になってきていて、機能面だけでなく、性能面が気になるケースが多くなってきたと思う。

じゃあどうやって性能はかるの?比べるの?って話になるよね。
いわゆる、NoSQLにおけるapache benchやJMeterのようなものがほしいよね。
今日は、そのあたりのひとつのソリューションとなるかもな、YCSBというツールについて書いてみる。

YCSBとは

YCSBはYahcoo Cloud Serving Benchmarkの略称で、プロジェクトのWebはこちら。
http://research.yahoo.com/Web_Information_Management/YCSB
https://github.com/brianfrankcooper/YCSB/wiki

ざっくりひとことで役割と説明すると、NoSQL向けのBenchmarkツール。
Javaで実装されていて、こんな感じの構成のアプリケーションになっている。

Workload driver -> DB Access layer -> NoSQL

DB Access layerのところが切りだされているのでここを実装すれば好きなNoSQLで動かすことができるのだが、、現状YCSBに同梱されてくるDB Access layerのライブラリはこんな感じ。メジャーなNoSQLがカバーされてるので、わりとそのまま使えそう。

  • cassandra
  • DynamoDB
  • ElasticSearch (CloudSearchじゃないよw)
  • GemFire
  • HBase
  • Hypertable
  • Infinispan
  • Mapkeeper
  • MongoDB
  • Redis
  • OrientDB
  • Voldemort

インストールしてみる

大まかな流れとしては、githubからコードをダウンロードしてmavenでビルドする感じ。githubのreadmeにビルド済みのバイナリへのリンクがあるが、こちらは割りと最小セットなビルドっぽくて、DynamoDBのクライアントなどは入ってなかったりしたので、ソースをまるごとcloneしてビルドした。あと、ycsbでググると結構むかしのブログが出てきて、そこではantでビルドしてるが、いつかのタイミングからmavenプロジェクトに変わったっぽい。

手順的には以下でOK

$ git clone https://github.com/brianfrankcooper/YCSB.git
$ cd YCSB
$ mvn clean package

動かしてみる

試しにredisに負荷を掛けてみる。

  • 最初にredisの設定を書く。こんな感じ。
$ cat redis.conf
redis.host=HOSTNAME or IPADDRESS
redis.port=PORTNUMBER
  • 負荷をかけてみる。workloadファイルというファイルにいろいろ負荷のパターンなど記述されているが、ひとまずは動かしてみるだけなのであまり気にせずやってみる。結果が標準出力に出るのでリダイレクトしてます。
$ cd YCSB
$ ./bin/ycsb run redis workload/workloada > result
Loading workload...
Starting test.

$ cat result
YCSB Client 0.1
Command line: -db com.yahoo.ycsb.db.RedisClient -P workloads/workloada -P redis
/conf/redis.conf -t
[OVERALL], RunTime(ms), 15236.0
[OVERALL], Throughput(ops/sec), 6562.746127592544
[UPDATE], Operations, 49918
[UPDATE], AverageLatency(us), 2241.0442125085137
[UPDATE], MinLatency(us), 2158
[UPDATE], MaxLatency(us), 15749
[UPDATE], 95thPercentileLatency(ms), 2
[UPDATE], 99thPercentileLatency(ms), 2
==snip==

こんな感じに結果がでた。
大事なのはこのあたりか。

[OVERALL], RunTime(ms), 15236.0
[OVERALL], Throughput(ops/sec), 6562.746127592544
[UPDATE], Operations, 49918
[UPDATE], AverageLatency(us), 2241.0442125085137
[UPDATE], MinLatency(us), 2158
[UPDATE], MaxLatency(us), 15749
[UPDATE], 95thPercentileLatency(ms), 2
[UPDATE], 99thPercentileLatency(ms), 2
[READ], Operations, 50072
[READ], AverageLatency(us), 2280.0942642594664
[READ], MinLatency(us), 2199
[READ], MaxLatency(us), 21950
[READ], 95thPercentileLatency(ms), 2
[READ], 99thPercentileLatency(ms), 2
  • 全体のスループット: 6552tps
  • 更新クエリの平均レイテンシ: 2242マイクロ秒
  • 参照クエリの平均レイテンシ: 2280マイクロ秒

という感じの読み方になると思う。

まとめ

負荷パターンの調整などはまた次回ということで一旦まとめる。

  • YCSBはNoSQLのベンチマークツール
  • メジャーなNoSQL(HBaseやMongoDB,Cassandra,DynamoDBなど)はすぐにベンチできる
  • クライアントライブラリが同梱されていないNoSQLでも、自分で実装可能
  • インストールはgithubからソースをダウンロードしてmavenでビルド(ビルド済みのバイナリも配布されている)

AWS SDK for node.jsでDynamoDBをバックエンドにしてチャットアプリケーションを作ってみた。

AWS SDK for node.jsのデベロッパープレビュー版がリリースされたので触ってみた。

AWS SDK for Node.js (Developer Preview)
http://aws.amazon.com/jp/sdkfornodejs/

まだデベロッパープレビュー版なのですべてのサービス向けのAPIが実装されているわけではなくて、現状EC2、S3、DynamoDB、SWFのみに対応済みな状態。こんな感じ↓
f:id:imai-factory:20130111163105j:plain

何をしてみようかというところで、S3を取り扱うようなサンプルコードはちょいちょい見かけるのでDynamoDBを扱って見ることに。socket.ioを触ったことがなくて、触ってみたこともあって、http://www.atmarkit.co.jp/ait/articles/1210/10/news115.htmlを参考にチャットアプリケーションを実装してみた。アーキテクチャはこんな感じ。
express + mongoose + MongoDBが流行ってるので、express + AWS SDK + DynamoDBもなかなかイイぜ、的な。
f:id:imai-factory:20130111163424j:plain
ちなみに、残念ながらDynamoDBにはgemfireやredisのようなmessagingの機能は実装されていません。WebSocketでのpush通信の話と一緒にすると誤解を生むかもなので念のため。

アプリケーション的には接続中のクライアントがルームを指定してメッセージをやりとりできるという、まあ普通なチャットアプリ。この投稿されたメッセージはサーバー側でDynamoDBに格納されます。新規にルームに接続してきたクライアントに対して、当該ルームの最新1時間分のログをDynamoDBから取り出して表示させて、あとは普通にチャットできますよという感じ。

  • 画面を開くとこんな感じ

f:id:imai-factory:20130111164243j:plain

  • room1というルーム(現状、ルーム名はなんでも通る)に接続してメッセージを投稿してみる。

f:id:imai-factory:20130111164342j:plain

  • もう一枚ブラウザをたちあげてroom1に入ってみるとDynamoDBに格納されたメッセージが初期状態として表示されます。

f:id:imai-factory:20130111164615j:plain

とまあいたって普通なチャットアプリケーションです。コードはgithubにアップしました。
https://github.com/imaifactory/nodejs_chat

今回のメインのトピックはDynamoDBを取り扱う際のサンプルコードなので、そこだけ抜粋してみます。

まず、sdkのインストールはnpmで行う。

$ npm install aws-sdk

使う前の準備としてCredentialやRegionの設定をする

$ vi config.json

{
    "accessKeyId":"hoge",
    "secretAccessKey":"hoge",
    "region":"ap-northeast-1"
}

コード内での初期化はこんな感じ。

var aws = require('aws-sdk');
aws.config.loadFromPath('./aws_config.json');

var ddb = new aws.DynamoDB.Client();

このクライアントを使ってDynamoDBの操作を行う。
今回のコードでデータを投げ込んでいるのはこんなコード。
Itemが実際のデータだが、特徴的なのは、Itemの各項目にSという型の指定が必要なこと。S=String, N=Number, B=Binaryという感じ。

            ddb.putItem(
                {
                    TableName:tableName,
                    Item: {
                        topic: {S:clientInfo.room},
                        text:  {S:message.text},
                        name:  {S:clientInfo.name},
                        timestamp: {S:message.timestamp}
                    }
                },
                function(err,data){
                    if(err){
                        console.log(err);
                    }else{
                        io.sockets.to(clientInfo.room).json.emit('message',message);
                    }
                }
            );

データを取り出しているコードはこんな感じ。queryというAPIを使っています。
HashKeyを指定したうえで、RangeKeyが特定の値(今回のアプリケーション的にはタイムスタンプ)よりも大きい(GT)レコードを取得しています。コードからだとわかりにくいけど、RangeKeyConditionのなかのAttributeValueListというところでRangeKey(タイムスタンプ)を指定してます。

        ddb.query(
            {
                TableName:tableName,
                HashKeyValue:{ S:data.room },
                RangeKeyCondition:{
                    ComparisonOperator:'GT',
                    AttributeValueList:[
                        {S:date.timestampDelta(appConfig.timeLine.startTime)}
                    ]
                },
                ScanIndexForward: true,
                AttributesToGet:['topic','text','timestamp','name']
            },
            function(err,data){
                if(err){
                    console.log(err);
                }else{
                    socket.json.emit('initial',data);
                }
            }
        );

今回はputItemとqueryしかつかってないけど、データを1件取得するgetItemやbatchGetItemやbatchPutItemなど、もちろん他にもたくさんAPIはあります。詳細はこちらのAPIリストを参照のこと。
http://docs.amazonwebservices.com/AWSJavaScriptSDK/latest/frames.html

ハマった点として、現状のSDKだとマルチバイトのutf8がうまく通らなかったこと。なので日本語の取扱がうまくいきませんでした。プレビュー版なので修正を待ちますということで。


あと、今回のネタとは直接関係ないですが、AWS上でWebSocketを使う際の注意点として、ELBが非アクティブなセッションを60秒で切ってしまうことがあります。現状、この値は変更できないのでご注意を。

実際ガッツリつかうとなると、セッション管理や維持にマシンのCPUパワーをかなり食うと思うので、このあたりの運用やスケーリングのベストプラクティス的なものはまた今後まとめていきます。