目次

目次

SQSでデッドレターキューに流れた処理をLambdaで定期的に再実行させる

森川拓
森川拓
最終更新日2025/05/02 投稿日2025/05/02

はじめに

NX開発推進部の森川です。

運用しているシステムにおいて、外部のAPIにリクエストを送るという部分があるのですが、SQSで処理する様になっています。 SQSで処理することには様々な利点がありますが、一例としては、「外部APIが何らかの障害等で停止している」場合でも、解消後に再リクエストすることで簡単にリカバリーできる点が挙げられます。

ただ、このSQSでデッドレターキュー(以降はDLQと表記)の作成を有効にしている場合、inactive状態でキューの処理が実行されると、その処理はエラーにならずに流れてしまいます。 そういった場合にも自動でリカバリーできる様に、Amazon EventBridgeとLambdaを使って定期的にDLQに積まれたキューを再実行させる仕組みを作成しました。

今回はその方法を記載していきます。 また、今回の記事ではFIFOを対象としています。

all_statue.png

目次

バージョン情報

  • Python 3.9(作成するLambdaのランタイムに合わせる。)

Lambda用の関数作成

処理に必要なライブラリをインポートします。 これらのライブラリは全てLambda実行環境に標準で含まれているため、 デプロイ用のzipファイル作成時に追加のパッケージインストールは不要です。 ただし、ローカル環境でコードのテストを行う場合は、boto3のインストールが必要になります。

pip install boto3
# Pythonの標準パッケージ
import json
import logging
import os
import time
import uuid

# AWS SDK (Lambdaの実行環境に存在)
import boto3

ログの設定とAWS SDK(SQSクライアント)の初期化処理、環境変数から値を取得する部分を実装します。 ログや環境変数に関しては実装する環境によって適宜変更してください。

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# SQSクライアントの初期化
sqs = boto3.client("sqs")

# 環境変数から設定を取得
DLQ_URL = os.environ.get("DLQ_URL", "") # 取得の対象となるDLQのURL
TARGET_QUEUE_URL = os.environ.get("TARGET_QUEUE_URL", "") # 積み直す先のSQSのURL
MAX_MESSAGES = int(os.environ.get("MAX_MESSAGES", "10")) # 一度に処理するDLQのメッセージ数
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) # 何度処理を繰り返すか

DLQからメッセージを取得し、元のターゲットキューに再送信する処理を実装します。 同じメッセージが再送信される場合がありますが、そう言ったハンドリングはこの関数内では実装していません。 必要があれば重複処理のハンドリングを実装してください。

def process_dlq_batch():
    """DLQから一括でメッセージを取得して処理する"""
    try:
        # DLQからメッセージを受信
        # QueueUrl: メッセージを取得するキューのURL
        # MaxNumberOfMessages: 一度の呼び出しで取得するメッセージの最大数
        # WaitTimeSeconds: ポーリング時間(メッセージが見つからない場合にこの時間待機する)
        # VisibilityTimeout: 他環境で取得しても見えなくなっている時間(整合性のため)
        # AttributeNames: All指定することで全てのメッセージ内容を取得する
        # MessageAttributeNames: All指定することで全てのカスタムメッセージを取得する
        response = sqs.receive_message(
            QueueUrl=DLQ_URL,
            MaxNumberOfMessages=MAX_MESSAGES,
            WaitTimeSeconds=5,
            VisibilityTimeout=30,
            AttributeNames=["All"],
            MessageAttributeNames=["All"],
        )

        if "Messages" not in response:
            logger.info("処理するメッセージがありません")
            return 0

        messages = response["Messages"]
        logger.info(f"{len(messages)}件のメッセージを処理します")

        for message in messages:
            # 再試行回数を確認(カスタム属性として保存されている場合)
            message_attributes = message.get("MessageAttributes", {})
            retry_count = 0

            if "RetryCount" in message_attributes:
                retry_count = int(message_attributes["RetryCount"]["StringValue"])

            # 最大再試行回数を超えていないか確認
            if retry_count >= MAX_RETRIES:
                logger.warning(
                    f"メッセージID {message['MessageId']} は最大再試行回数を超えました。処理をスキップします。"
                )
                continue

            # 再試行回数を更新
            message_attributes["RetryCount"] = {
                "DataType": "Number",
                "StringValue": str(retry_count + 1),
            }

            # ターゲットキューにメッセージを送信
            # MessageDeduplicationId, MessageGroupIdを指定することでFIFOキューに対応
            sqs.send_message(
                QueueUrl=TARGET_QUEUE_URL,
                MessageBody=message["Body"],
                MessageAttributes=message_attributes,
                MessageDeduplicationId=str(time.time()),
                MessageGroupId=str(uuid.uuid4()),
            )

            # 処理したメッセージをDLQから削除
            sqs.delete_message(QueueUrl=DLQ_URL, ReceiptHandle=message["ReceiptHandle"])

        return len(messages)

    except Exception as e:
        logger.error(f"エラーが発生しました: {str(e)}")
        return 0

Lambda関数のエントリーポイントを作成します。

def lambda_handler(event, context):
    """
    CloudWatchアラートから呼び出されるハンドラー関数
    デッドレターキューからメッセージを取得し、ターゲットキューに再送信する
    """
    logger.info("デッドレターキュー処理を開始します")
    total_processed = 0

    # Lambda実行時間内で可能な限りメッセージを処理
    while context.get_remaining_time_in_millis() > 10000:  # 10秒以上残っている場合
        processed = process_dlq_batch()
        if processed == 0:
            # これ以上処理するメッセージがない
            break
        total_processed += processed

    logger.info(f"合計 {total_processed} 件のメッセージを再送信しました")
    return {
        "statusCode": 200,
        "body": json.dumps({"processed_messages": total_processed}),
    }

上記をまとめて1ファイルにします。 また、Lambdaにアップロードする際はzipファイルで実施するので作成したPythonファイルをzipにしておきます。

 zip -r dlq_retry_lambda.zip .

ファイル名は任意ですが、今回は dlq_retry_lambda.zipで作成します。 任意で設定している場合は dlq_retry_lambda.zipの部分は読み替えてください。

Lambdaの作成

関数の作成

作成した関数を実行するためのLambdaを構築します。

AWSコンソールのLambda内にある「関数」を選択します。 画面右上にある「関数の作成」を選択します。

create_function.png

関数名: EventBridgeから指定するので認識しやすい名前が良いです。 ランタイム: Python3.9 (Pythonであればバージョンは任意で問題無いです。) アーキテクチャ: x86_64

create_function_info.png

上記内容を設定後に関数の作成をします。 作成した関数に先ほど作成したzipファイルをアップロードします。

zip_upload.png

また、作成したlambda_functionのファイル名を lambda_function以外にしている場合、実行時にfunctionが見つからないというエラーになってしまいます。 ランタイムのハンドラを適宜変更してください。

今回の場合は次の様に設定しています。

handler.png

設定タブの一般設定内のタイムアウトに関しても任意の値を設定してください。 今回のコードの場合タイムアウトが10秒以下の場合は処理がされないためそれ以上に設定してください。

setting_timeout.png

アクセス権限の設定

Lambda関数を実行する際にCloudWatchログとSQS関連の権限が必要になります。 次の様なロールを作成し、Lambdaに割り当てる必要があります。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:SendMessage"
            ],
            "Resource": [
                "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:*",
                "arn:aws:sqs:ap-northeast-1:XXXXXXXXXXXX:対象になるDLQの名前.fifo",
                "arn:aws:sqs:ap-northeast-1:XXXXXXXXXXXX:対象になるSQSの名前.fifo"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/lambda/作成したLambdaの名前:*"
            ]
        }
    ]
}
  • 許可されるアクション
    • sqs:ReceiveMessage: SQSキューからメッセージを受信する権限
    • sqs:DeleteMessage: SQSキューからメッセージを削除する権限
    • sqs:GetQueueAttributes: SQSキューの属性を取得する権限
    • sqs:SendMessage: SQSキューにメッセージを送信する権限
    • logs:CreateLogGroup: ロググループを作成する権限
    • logs:CreateLogStream: ログストリームを作成する権限
    • logs:PutLogEvents: ログイベントを書き込む権限

作成したロールをLamdaから設定 > アクセス権限 > 実行ロールで割り当ててください。

EventBridgeの作成

作成したLambdaを定期実行するためのEventBridgeを作成します。 AWSコンソール > Amazon EventBridge > ルールからルールを作成を選択してください。

今回はEventBridgeSchedulerを利用していないため、ルールタイプはスケジュールを選択し、続行してルールを作成を選択してください。

create_rule.png

スケジュールには自身が実行したい間隔を各自設定してください。 今回の場合はSQSのinactive状態にも対応する且つ外部APIへのリクエストということを鑑みて3時間おきに実行する様に設定しています。

ターゲットとしては先程作成したLambdaを選択してください。 また、後ほどLambda自体にロールを割り当てるので実行ロールの使用のチェックは外してください。

以上でEventBridgeの設定は完了です。

最後にLambdaのアクセス権限からリソースベースのポリシーステートメントを設定します。 アクセス権限を追加を選択し、次の様に設定してください。

  • ステートメント ID: 任意
    • 判別できる値を入力。
  • プリンシパル: events.amazonaws.com
    • Amazon EventBridge(CloudWatch Events)サービスにアクセス権を付与。
  • アクション: lambda:InvokeFunction
    • Lambda関数を呼び出す権限。

動作確認

設定がうまく行っている場合はLambdaの実行ログがCloudWatchログに出力されています。

cloudwatch_dlq_log.png

あとがき

外部のAPIを使っている関係上、エラーになった瞬間は再度キューを投げる様にしたり、そもそもSQSを使わずにリクエストを投げる様にしていると、外部要因で処理を正しく完了できない恐れがあります。 このLambdaを実装するまではエラーのアラート表示はおこなっていましたが、DLQに積まれてしまったキューを間隔を空けて再実行する仕組みが存在しておらず、 エラー検知時に人が介入してキューを再実行する必要がありました。 人が介入する以上、エラーの見逃しが起きてしまったり、人によって対応頻度が変わってしまったりと運用において健全とは言えませんでした。 その問題を解決したかったので、このLambdaを実装しました。 その結果として過敏にDLQのエラーをチェックしなくなっても良くなり、運用コストを幾らか削減できました。 実際のコードなどには反映されておらず、誰の目にも留まるものでも無いですが、こういった仕組みを作って自分の心労を減らしていけるのは良いことなので今後も続けていきたいです。

森川拓

目次