Flink SQL導入の背景
- Azar Matching Dev Teamで管理していたFlinkベースのアプリのうち、CPU 96個を使用する重いレガシーアプリがあった
- このアプリは複数の機能をモノリシック構造で実装しており、保守が難しかった
- インフラ作業で実行ノードを変更したところ、アプリが正常に動作しない問題が発生した
- 高い運用負荷を受け入れて保守を続けるか、別の方法に置き換えるかを決める必要があった
選べた選択肢
- 既存アプリの重要な機能はすでに新しいFlinkアプリで実装されていた
- 条件付きイベント発行およびロジック実行部分を置き換える方法を検討した
- 1つのFlink Appとして実装
- 長所: 運用が簡単
- 短所: アプリが大きくなる可能性が高く、一部が失敗すると他の機能にも影響しやすい
- 複数のFlink Appとして実装
- 長所: 独立して管理できる
- 短所: アプリ数が増えると負担が増す
- Flink SQLを使用
- 長所: クエリでロジックを定義でき、1つのクラスターだけを管理すればよい
- 短所: 複雑なロジックの表現が難しく、クラスター管理に慣れていないと扱いにくい
Flink SQLを選んだ理由と代替技術との比較
- Flink SQL導入前にksqlDBとSpark Structured Streamingを検討した
- Flink SQLを選んだ理由:
- High Availability
- CheckpointとSavepointを通じてアプリの状態を安定して保存・復旧できる
- JobManagerはHAモードで構成できる
- 高度なストリーミング機能のサポート
- SQL構文で多様なストリーミング処理機能を利用できる
- ウィンドウ、ジョイン、イベントタイム処理、ウォーターマークなどをサポート
- UDFおよびCustom Connectorによる拡張性
- ユーザー定義関数と多様なデータソース・sinkの接続が可能
vs ksqlDB
- Confluentプラットフォームに含まれているが、statefulなストリーミング処理ではHA動作が非効率的
vs Spark Structured Streaming
- Spark SQLエンジンベースで実装され、UDFおよびCustom Sinkを作成できる
- マイクロバッチ単位で動作するため、リアルタイム処理には不利な場合がある
クラスター環境の構築とクエリ配布方式
ローカルで簡単にテストする
- ローカルでFlink Clusterを立ち上げてSQLクエリを提出する方法を紹介
本番環境でのクラスターアーキテクチャ
- Kubernetes上でFlink SQL Clusterを構成
- Application modeとSession modeを比較
GitOps方式を利用したクエリ配布
- GitHub Actionsを使用してクエリ配布とJob停止を実装
主な運用事例とトラブルシューティング経験
JobManagerまたはTaskManagerがFailする場合
- JobManagerはHA設定により、Failしても作業を継続できる
- TaskManagerはFail時に作業が再分配されて継続される
クエリがFailする場合
- 異常データの流入時やコンピューティングリソース不足時に発生
- JSONフォーマットエラーの無視設定やデフォルト値設定が可能
クラスター再起動時に一部JobがFailする場合
クエリの条件を1つ修正して再配布したい場合
- 簡単な修正のときに限りsavepointを利用したstate復元が可能
主なモニタリングポイント
numRunningJobs, taskmanager.cpu.load, taskmanager.memory.used などの指標を確認
おわりに
- Flink SQL導入により、生産性と運用効率が向上
- 安定性に優れており、GitOps Controllerパターンの実装を計画
1件のコメント
Flinkのような分散システムは、HAを維持するためにrackを2〜3個維持すべきですが、Kubernetesを連携することでHAを保証したように見えますね。ただ、結局はkube slave nodeのリソースも考えないといけないはずなので、Flinkだけを載せるnodeを構成したのか気になります(Flink高負荷時にslave nodeがダウンする問題がありそうです)。
そういう観点で、Kubernetesを使うメリットはあるのでしょうか?
また、Flinkでウィンドウ関数を使うと、その間のデータはメモリに保持されることでSQLのjoin文が動作するわけですが、trade-offの観点で見るとFlinkは良い選択肢なのかと考えてしまいます。時間が経つほど巨大化するSQL + jobが落ちたときに起きる大変なこと……。
私も最上流のdata sourceでjoinが必要な状況のとき、Flinkを使わずにどのような方式でapplication levelに下ろして処理できるか悩みますね。