smile-nestjs-sqs - NestJSでSQSを本番環境でより安全に使うために作りました
(github.com/spotlight21c)こんにちは。
NestJS + SQS の組み合わせでサービスを開発していく中で、NestJS エコシステムの他の SQS パッケージを使っていたのですが、
実際の本番環境で感じた不便さがいくつかあり、特定のパッケージには PR も提出しました。しかし、もう更新されていなかったり、そもそもそれを問題として認識していないところもあったため、これを自分で解決するためにモジュールを作りました。実際に既存で使っていた SQS モジュールをこれに置き換えて、安定して運用しています。
NestJS で SQS モジュールを探している方、そして私と似た悩みを持っていた方の助けになりそうなので共有します。
このパッケージには次のような特徴があります
NestJS フレンドリー
NestJS で動的にモジュールを生成して import し、デコレーターを付けたハンドラーを注入することで簡単に実装できます。
他の大半の SQS パッケージと似たシグネチャを持っています。
@Module({
imports: [
ConfigModule.forRoot(),
SqsModule.registerAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => {
const region = config.getOrThrow<string>('AWS_REGION');
const queueUrl = config.getOrThrow<string>('ORDERS_QUEUE_URL');
const defaultSqsClient = new SQSClient({ region });
return {
defaultSqsClient,
consumers: [{ name: 'orders', queueUrl }],
producers: [{ name: 'orders', queueUrl }],
};
},
}),
],
})
export class AppModule {}
@Injectable()
export class OrderQueueHandler {
@SqsMessageHandler('orders')
public async onMessage(message: Message): Promise<Message> {
// return message to ack/delete
return message;
}
@SqsConsumerEventHandler('orders', 'processing_error')
public onProcessingError(error: Error, message: Message) {
// report error
}
}
最新の bbc パッケージを使用
このパッケージは週間 130 万ダウンロードを超え、Node.js 界隈では SQS ライブラリとして最も有名な以下 2 つのライブラリの最新版をベースにしています。
https://github.com/bbc/sqs-producer
https://github.com/bbc/sqs-consumer
sqs-consumer には v14 から意味のある breaking change が存在します。
https://github.com/bbc/sqs-consumer/discussions/584
13 バージョン以下では、SQS ハンドラーが void を返しても ack の挙動と見なされていました。
しかし 14 バージョンからは、明示的に SQS ハンドラーが返した message を ack と見なします。
この変更はより明示的で予測しやすく、ユーザーに ack 実装における混乱を与えない、意味のある変更です。
ただし、現在出回っている NestJS 用 SQS モジュールを名乗る npm パッケージの大半は、ひとまず sqs-consumer 13 以下のバージョンを使っています。
ブート時検証
アプリ起動時に誤った設定を即座に検出できるため、より安全に利用できます。
- 重複した consumer/producer 名を検出
- 存在しない consumer にデコレーターを使うと即エラー
- イベント名のタイプミスを検出
- batch/single ハンドラーのパラメーター型不一致を検出
- batch/single の戻り値型不一致を検出
graceful shutdown
bbc/sqs-consumer は graceful shutdown のために pollingCompleteWaitTimeMs オプションを提供しています。
そして NestJS には NestJS のライフサイクルがあります。
NestJS の onModuleDestroy 段階で、内部的に各 consumer が stopped イベントを発行するまで終了を待ちます。
consumer が処理中だったメッセージが終わって初めてこのイベントが発生するため、メッセージ処理が完了してから安全にプロセスが終了します。
NestJS のライフサイクルと自動的に連携しているため、
app.enableShutdownHooks() を有効にして、
pollingCompleteWaitTimeMs と shutdownTimeoutMs を適切に設定しておけば、k8s 環境でも突然の終了に対してより安全に利用できます。
本番環境で SQS を使いながら「これはいつ更新されるんだろう」と感じていたものがずっと更新されないままで、
堅牢な開発文化を作っていくうえで支障があったため、それを解決した成果物です。
似た環境で似た悩みを抱えている方であれば、ぜひ一度使ってみてフィードバックをいただけると嬉しいです。
まだコメントはありません。