Luft: 10秒で10億件のデータをクエリするデータストア開発記
(tv.naver.com)月間平均イベント数が100億件を超える環境で、短時間でデータを分析し、ユーザー行動分析機能(Cohort)を実現しなければならない状況が訪れた。
(例:過去6か月間に自社アプリで月10万円以上消費した30代女性 → 彼女たちの再訪率)
こうした環境で、これまで使うだけだったデータストアを自ら実装した話を扱っている。
ユーザー行動分析クエリを実装するには…
-
事前にあらかじめ計算していないメトリクスもクエリできる必要がある(+新しい種類の分析も Re-indexing なしで可能でなければならない)
-
イベントデータをユーザーごとに Group By する際、High Cardinality Shuffle のボトルネックが小さい必要がある
既存ソリューションを使うか、自前ソリューションを作るか悩んだ
-
Druid は別の場所で使っていたが、Pre-Aggregation(計算済みの値だけを読む方式)の限界により機能実装には不向きだった
-
Snowflake や Redshift などのデータウェアハウスを大規模運用することはできるが、汎用性が高いぶん、目標に対して過大な規模のクラスターを運用する必要があり高コスト
-
Funnel、ID マッチングなど多様なニーズをカバーするには、SQL ベースの DB には限界がある
結局データストアを自作する
-
Luft = 最初からユーザー ID 基準で Group By されたユーザー行動分析クエリを高速に実行することに最適化されたデータストア
-
Golang をベースに作られている
-
数十 TB 規模のユーザーデータを5台以下のノードだけで平均3秒〜最大10秒の間で分析
-
一般的な RDBMS と異なり不変性を持つ(必要なら同じ期間のデータを上書き)→ シンプルなクラスター設計、複雑なページマネージャー実装なしで高性能、望むデータ保存フォーマットを設計可能
技術基盤を見ていく
- TrailDB(ストレージエンジン) - ユーザー ID パーティショニングに最適化された時系列イベント保存 Rowstore
→ 値を辞書化してその ID だけを保存
→ ユーザーイベントを時系列順に並べ、前のイベントと比べて増えた時間値、変更されたカラムだけを保存(ほとんどのユーザー属性は変わらないため)
→ インデックスなし。必ずフルスキャンする必要がある。
→ しかし驚異的に高い圧縮率を誇る(CSV 13GB → ~TrailDB 300mb)
→ 計算量は O(n) なので、空間計算量を減らせばよいと考えた
- LLVM(クエリエンジン)
→ ただし TrailDB は OR-AND 形式の equals しか提供せず、Go でパースしたクエリを C、C++ に渡さなければならない
→ PostgreSQL がクエリを LLVM JiT にコンパイルしていることを知る
→ クエリは機能拡張が頻繁なので、C、C++ で書くと開発コストが増える問題を防げる(Golang で LLVM IR だけ生成して渡し、C、C++ 側で JiT コンパイルして実行すればよい)
- 演算レイヤーを自作する
→ MapReduce はよく使われるが、Golang を使うので使えない
→ Spark/Hadoop は Long-running Job に最適化されており、つないでみても性能が出にくい
→ これも自作 → https://github.com/ab180/lrmr
→ gRPC + Protobuf + etcd の組み合わせで、親しみのある Spark の設計を多く取り入れた
→ Resiliency を諦める → 性能を極限まで高めれば、障害が発生しても最初からやり直して10秒未満
→ 大規模データ処理によるバッファオーバーフローが頻発する(Backpressure)問題を、Pull-based Event Stream に変更(Kafka、Armeria などでも採用)
- シャーディングを自前実装する
→ シャード = ヒストリカルノード
→ パーティションの日付範囲をシャーディングキー値として使うと?
→ すべてのクエリに時間があり → フィルタリングしやすい
→ 同じ時間範囲には似た容量のデータがある → データ分散しやすい
→ 分散環境は美しくない…
→ ノードがダウンしたり新規追加されたら?
→ ストレージ容量がいっぱいになったら?
→ 障害で1台のノードにだけ偏ったら?
→ Druid の Cost Function をカスタムし、パーティションの日付範囲が近く重なるほど Cost が高くなるようにした
→ シャード可用性のために以下を実施
→ シャード情報に TTL を設定し、定期的に更新(etcd)
→ S3 にパーティションを保存し、DynamoDB でパーティション一覧を管理
現在の本番環境の状況
- 4台の c5.2xlarge インスタンスだけで 15秒以内に 500GB のデータをスキャン
今後の目標(あるいはやるべきこと)
-
リアルタイム Funnel 分析を10台以内のクラスターで実現したい
-
Spark をサポートして ML 連携なども支援したい
-
TrailDB を置き換える自前カラムストア(Ziegel)を開発中
→ SIMD とマルチコア最適化
→ Bitmap Index で事前にユーザー属性ベースのフィルタリング
2件のコメント
traildb、面白いですよね。https://www.youtube.com/watch?v=-oPFxSwn0lM 面白いです。かなり古い動画ですが、traildb はその間あまり変わっていないと思います。
今見ると、開発者の方のブログ記事もありますね。
https://engineering.ab180.co/stories/introducing-luft
TrailDBは初めて聞きましたが、こういうもののようです……
https://github.com/traildb/traildb