CloudWatchのアラート発生時の詳細ログをSlackに流してみた

こんにちは。
PHR事業開発部の村越です。

今回は、CloudWatchのアラートが発生した際に、詳細なログをSlackに流す方法を記載しようと思います。

何故やろうと思ったのか

現在、弊社ではAWS上でエラーを検知した際にSlackにアラートを流すようにしているのですが、Chatbot経由での通知だと、エラーが発生したことはわかるが、Slack上では原因までは追えず、通知が来るたびにCloudWatchからエラーを検索するといった運用になっていました。

毎回エラーが発生する度にCloudWatchから検索するのは時間がかかるので、通知の際にエラー時のログを添付するようにし、調査に掛かる時間を減らそうと思います。

1. メトリクスフィルターを作成する

CloudWatch → ログ → ロググループ → 通知したいロググループ → メトリクスフィルター → メトリクスフィルターを作成

2. SNSのトピックを作成する

Amazon SNS → トピック → トピックの作成

3. アラームを作成する

CloudWatch → アラーム → アラームの作成
メトリクスの選択:手順1で作成したメトリクスを指定
アクションの設定 → 通知:通知の送信先に、手順2で作成したSNSトピックを指定

4. KMSを設定する

KMS → カスタマー管理型のキー → キーの作成
作成後、ARNをコピーしておく

5. Lambdaを作成する

Lambda → 関数 → 関数の作成
設計図の使用:cloudwatch-alarm-to-slack-python
実行ロール:基本的なLambdaアクセス権限で新しいロールを作成
SNSトピック:手順2で作成したSNSトピックを指定
環境変数:
 slackChannel:通知したいチャンネル
 kmsEncryptedHookUrl:下記のページで発行したWebhook URL
https://slack.com/apps/A0F7XDUAZ--incoming-webhook-
 暗号化の設定 → カスタマーマスターキーの使用:手順4で作成したKMSを指定
 転送時の暗号化に使用するヘルパーの有効化:チェック
 → kmsEncryptedHookUrl右側の「暗号化」ボタン
   KMSキー:手順4で作成したKMSを指定

6. ロールの修正

IAM → ロール → 手順5で作成したロール(関数名_role-XXX) → アクセス権限 → AWSLambdaBasicExecutionRole-XXXX → ポリシーの編集 → JSON

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Sid": "Stmt1443036478000",
           "Effect": "Allow",
           "Action": [
               "kms:Decrypt"
           ],
           "Resource": [
               "手順4でコピーしたARN"
           ]
       }
   ]
}

「ポリシーをアタッチします」 → 下記の2つを選択してアタッチ
・CloudWatchReadOnlyAccess
・AWSLambdaBasicExecutionRole

7. Lambdaのコードを修正

import boto3
import json
import logging
import os
import datetime
import calendar
import re


from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError


ENCRYPTED_HOOK_URL = os.environ['kmsEncryptedHookUrl']
SLACK_CHANNEL = os.environ['slackChannel']

HOOK_URL = boto3.client('kms').decrypt(
   CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL),
   EncryptionContext={'LambdaFunctionName': os.environ['AWS_LAMBDA_FUNCTION_NAME']}
)['Plaintext'].decode('utf-8')


logger = logging.getLogger()
logger.setLevel(logging.INFO)

#抽出するログデータの最大件数
OUTPUT_LIMIT=10
#何分前までを抽出対象期間とするか
TIME_FROM_MIN=5
#ERRORの何秒前後のログを出力するか
TIME_ERROR_AROUND=3


def lambda_handler(event, context):
   logger.info("Event: " + str(event))
   message = json.loads(event['Records'][0]['Sns']['Message'])
   logger.info("Message: " + str(message))
   
   logs = boto3.client('logs')
   
   # MetricNameとNamespaceをキーにメトリクスフィルタの情報を取得する。
   metricfilters = logs.describe_metric_filters(
       metricName = message['Trigger']['MetricName'] ,
       metricNamespace = message['Trigger']['Namespace']
   )
   
   logger.info("Metricfilters: " + str(metricfilters))
   
   #ログストリームの抽出対象時刻をUNIXタイムに変換(取得期間は TIME_FROM_MIN 分前以降)
   #終了時刻はアラーム発生時刻の1分後
   timeto = datetime.datetime.strptime(message['StateChangeTime'][:19] ,'%Y-%m-%dT%H:%M:%S') + datetime.timedelta(minutes=1)
   u_to = calendar.timegm(timeto.utctimetuple()) * 1000
   #開始時刻は終了時刻のTIME_FROM_MIN分前
   timefrom = timeto - datetime.timedelta(minutes=TIME_FROM_MIN)
   u_from = calendar.timegm(timefrom.utctimetuple()) * 1000
   
   # ログストリームからエラーのログを取得
   response = logs.filter_log_events(
       logGroupName = metricfilters['metricFilters'][0]['logGroupName'],
       filterPattern = metricfilters['metricFilters'][0]['filterPattern'],
       startTime = u_from,
       endTime = u_to,
       limit = OUTPUT_LIMIT
   )
   
   logger.info("エラーのログ: " + str(response))
   
   #メッセージの整形
   log_message = u""
   for e in response['events']:
       error_time = int(str(e['timestamp'])[:10])
       # エラーのログから前後X秒のログを取得
       time_from = (error_time - TIME_ERROR_AROUND) * 1000
       time_to = (error_time + TIME_ERROR_AROUND) * 1000
                     
       # エラーのログの前後のログを取得
       response2 = logs.filter_log_events(
           logGroupName = metricfilters['metricFilters'][0]['logGroupName'],
           startTime = time_from,
           endTime = time_to,
           limit = OUTPUT_LIMIT
       )

       for e2 in response2['events']:
           #UNIX時刻をUTCへ変換後、日本時間に変更している
           #date = datetime.datetime.fromtimestamp(int(str(e['timestamp'])[:10])) + datetime.timedelta(hours=9)
           #log_message = log_message + '\n' + str(date) + ' : ' + e['message']
           log_message = log_message + '\n' + e2['message']
   
   new_state = message['NewStateValue']
   reason = message['NewStateReason']
   alarm_description = message['AlarmDescription']

   slack_message = {
       'channel': SLACK_CHANNEL,
       'text': "<!here> \nステータス: %s\nアラーム理由: %s\n説明: %s\nログ: \n%s" % (new_state, reason, alarm_description, log_message)
   }

   req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
   try:
       response = urlopen(req)
       response.read()
       logger.info("Message posted to %s", slack_message['channel'])
   except HTTPError as e:
       logger.error("Request failed: %d %s", e.code, e.reason)
   except URLError as e:
       logger.error("Server connection failed: %s", e.reason)

上記でアラーム発生時に、Slack上にエラーのログと、前後のログが通知されるようになります。
ログの件数などは、上部の変数を弄って貰えれば変更可能です。

最後に

AWSのアラームについて記載してみましたがいかがだったでしょうか?

今回は、発生したアラームの前後のログを取得するようにしていますが、前後のログ取得時にフィルターを追加することで条件を指定して前後のログを取得することができます。
例えば、弊社では同じスレッドのログのみを出力するようにしています。
他にも前後の特定の文言が入っているログのみを出力するなども可能なので、出力するログの設定とLambdaをきちんと設計すれば、アラームからの通知だけ見れば何のエラーかまでわかるようになり、効率的にエラー検知ができるようになると思います。

今回は以上となります。
今後とも「NOBORI」をよろしくお願い致します。

村越