SQSでデッドレターキューに流れた処理をLambdaで定期的に再実行させる

AWS, Lambda, Python, SQS

はじめに

NX開発推進部の森川です。

運用しているシステムにおいて、外部のAPIにリクエストを送るという部分があるのですが、SQSで処理する様になっています。
SQSで処理することには様々な利点がありますが、一例としては、「外部APIが何らかの障害等で停止している」場合でも、解消後に再リクエストすることで簡単にリカバリーできる点が挙げられます。

ただ、このSQSでデッドレターキュー(以降はDLQと表記)の作成を有効にしている場合、inactive状態でキューの処理が実行されると、その処理はエラーにならずに流れてしまいます。
そういった場合にも自動でリカバリーできる様に、Amazon EventBridgeとLambdaを使って定期的にDLQに積まれたキューを再実行させる仕組みを作成しました。

今回はその方法を記載していきます。
また、今回の記事ではFIFOを対象としています。

all_statue.png

目次

バージョン情報

  • Python 3.9(作成するLambdaのランタイムに合わせる。)

Lambda用の関数作成

処理に必要なライブラリをインポートします。
これらのライブラリは全てLambda実行環境に標準で含まれているため、
デプロイ用のzipファイル作成時に追加のパッケージインストールは不要です。
ただし、ローカル環境でコードのテストを行う場合は、boto3のインストールが必要になります。

ログの設定とAWS SDK(SQSクライアント)の初期化処理、環境変数から値を取得する部分を実装します。
ログや環境変数に関しては実装する環境によって適宜変更してください。

DLQからメッセージを取得し、元のターゲットキューに再送信する処理を実装します。
同じメッセージが再送信される場合がありますが、そう言ったハンドリングはこの関数内では実装していません。
必要があれば重複処理のハンドリングを実装してください。

Lambda関数のエントリーポイントを作成します。

上記をまとめて1ファイルにします。
また、Lambdaにアップロードする際はzipファイルで実施するので作成したPythonファイルをzipにしておきます。

ファイル名は任意ですが、今回は dlq_retry_lambda.zipで作成します。
任意で設定している場合は dlq_retry_lambda.zipの部分は読み替えてください。

Lambdaの作成

関数の作成

作成した関数を実行するためのLambdaを構築します。

AWSコンソールのLambda内にある「関数」を選択します。
画面右上にある「関数の作成」を選択します。

create_function.png

関数名: EventBridgeから指定するので認識しやすい名前が良いです。
ランタイム: Python3.9 (Pythonであればバージョンは任意で問題無いです。)
アーキテクチャ: x86_64

create_function_info.png

上記内容を設定後に関数の作成をします。
作成した関数に先ほど作成したzipファイルをアップロードします。

zip_upload.png

また、作成したlambda_functionのファイル名を lambda_function以外にしている場合、実行時にfunctionが見つからないというエラーになってしまいます。
ランタイムのハンドラを適宜変更してください。

今回の場合は次の様に設定しています。

handler.png

設定タブの一般設定内のタイムアウトに関しても任意の値を設定してください。
今回のコードの場合タイムアウトが10秒以下の場合は処理がされないためそれ以上に設定してください。

setting_timeout.png

アクセス権限の設定

Lambda関数を実行する際にCloudWatchログとSQS関連の権限が必要になります。
次の様なロールを作成し、Lambdaに割り当てる必要があります。

  • 許可されるアクション
    • sqs:ReceiveMessage: SQSキューからメッセージを受信する権限
    • sqs:DeleteMessage: SQSキューからメッセージを削除する権限
    • sqs:GetQueueAttributes: SQSキューの属性を取得する権限
    • sqs:SendMessage: SQSキューにメッセージを送信する権限
    • logs:CreateLogGroup: ロググループを作成する権限
    • logs:CreateLogStream: ログストリームを作成する権限
    • logs:PutLogEvents: ログイベントを書き込む権限

作成したロールをLamdaから設定 > アクセス権限 > 実行ロールで割り当ててください。

EventBridgeの作成

作成したLambdaを定期実行するためのEventBridgeを作成します。
AWSコンソール > Amazon EventBridge > ルールからルールを作成を選択してください。

今回はEventBridgeSchedulerを利用していないため、ルールタイプはスケジュールを選択し、続行してルールを作成を選択してください。

create_rule.png

スケジュールには自身が実行したい間隔を各自設定してください。
今回の場合はSQSのinactive状態にも対応する且つ外部APIへのリクエストということを鑑みて3時間おきに実行する様に設定しています。

ターゲットとしては先程作成したLambdaを選択してください。
また、後ほどLambda自体にロールを割り当てるので実行ロールの使用のチェックは外してください。

以上でEventBridgeの設定は完了です。

最後にLambdaのアクセス権限からリソースベースのポリシーステートメントを設定します。
アクセス権限を追加を選択し、次の様に設定してください。

  • ステートメント ID: 任意
    • 判別できる値を入力。
  • プリンシパル: events.amazonaws.com
    • Amazon EventBridge(CloudWatch Events)サービスにアクセス権を付与。
  • アクション: lambda:InvokeFunction
    • Lambda関数を呼び出す権限。

動作確認

設定がうまく行っている場合はLambdaの実行ログがCloudWatchログに出力されています。

cloudwatch_dlq_log.png

あとがき

外部のAPIを使っている関係上、エラーになった瞬間は再度キューを投げる様にしたり、そもそもSQSを使わずにリクエストを投げる様にしていると、外部要因で処理を正しく完了できない恐れがあります。
このLambdaを実装するまではエラーのアラート表示はおこなっていましたが、DLQに積まれてしまったキューを間隔を空けて再実行する仕組みが存在しておらず、
エラー検知時に人が介入してキューを再実行する必要がありました。
人が介入する以上、エラーの見逃しが起きてしまったり、人によって対応頻度が変わってしまったりと運用において健全とは言えませんでした。
その問題を解決したかったので、このLambdaを実装しました。
その結果として過敏にDLQのエラーをチェックしなくなっても良くなり、運用コストを幾らか削減できました。
実際のコードなどには反映されておらず、誰の目にも留まるものでも無いですが、こういった仕組みを作って自分の心労を減らしていけるのは良いことなので今後も続けていきたいです。

AWS, Lambda, Python, SQS