見出し画像

AWS SSM Parameter Storeに格納したパラメータの変化を自動で更新する

前回紹介の記事では、WebHookURLをパラメータストアを使用して、セキュアに保持するのと、Lambdaなどのパラメータも同時に格納して使用してみるのを紹介しました。サービスの発展に伴い、Lambdaのメモリサイズなど、性能に関わるパラメータを上げたいということは十分に起こりえるものです。今回でいうと、パラメータをコンソールから変更した後、Cloudformationのスタックを手動でアップデートする必要がありました。
そのため、今回はパラメータストアの変化をイベントに設定し、自動でスタック更新をする仕組みを作成したので、紹介してみようと思います。
全体のソースコードは以下にあります。
https://github.com/shohta-tera/AWS_parameterstore_update

環境

- Windows10 Pro: 20H2
- WSL2: Ubuntu-18.04
- AWS CLI: aws-cli/2.0.37 Python/3.7.3 Linux/4.19.84-microsoft-standard exe/x86_64.ubuntu.18
- AWS SAM CLI: SAM CLI, version 1.0.0
- Python: 3.7

アプリケーションの全体像

Parameterのイメージ

上の画像のように、AWSのリソースはIaC化し、パラメータ関連はSystems Mangaerに外だししています。レポジトリで各リージョンごとの本番環境専用のブランチなどを切って管理することも考えられますが、今回の例では異なるメリットもあります。
- ロジック部分のソースコードが各環境で同一で済む。
- Systems Managerでパラメータの履歴が管理できる。
- IAMで本番環境などのパラメータにアクセス制限を適切な人に与えることができる。

更新アプリ

上図が全体図です。サービスの死活、傾向監視などはDatadogなどを用いて行い、アラートが来たときに対処するようなユースケースを念頭においています。とあるマイクロサービスにおいて、Lambdaのタイムアウトなどが生じているとして、Systems Mangaer経由でそのマイクロサービスのパラメータを変更するとします。その変更をイベントとして、SQSにメッセージが蓄積されます。そのメッセージを定期実行するLambdaが取得し、実際にサービスの更新を実行するLambda群に更新指示を送ります。ここで、Lambda群としているのは、マイクロサービスの更新が複数あれば、各マイクロサービスごとにLambdaをKickするためです。
このLambdaがタイムアウトの時間である15分の間に更新が完了できたか確認し、成功していれば、メールとWebhookを用いてTeamsやSlackなどに通知を送ります。もし、まだ成功していなければ更新確認用のLambdaをKickし、再度15分の確認時間を取ります。そこで成功していれば、先程と同様に通知をします。失敗していれば、再度確認は行わずメールで失敗したと通知するような仕組みになっています。

前提として…

マイクロサービスとして構築するスタックの命名とパラメータストアに格納するパラメータには制約が存在します。
-  スタックの命名規則
    - (環境名「prodなど」)-(マイクロサービス名「Customerなど」)-Stack
- パラメータの命名規則
    - /Cloud/(環境名)/(マイクロサービス名)/LambdaMemorySize

各種リソースのデプロイ

S3 バケットの作成

make create.bucket

- ソースコードや Cloudformation テンプレートを S3 バケットに保存する必要があるため、作成します。

CFn を自動更新するアプリケーションのデプロイ

make create.layer

- Slack や Teams に通知を送信するためや Lambda のリクエストのトレースなどを行うために、X-ray を導入しているので LambdaLayer に入れています。

make sam.package & make sam.deploy

- 上のコマンドを実行でアプリケーションをデプロイします。

自動更新を検証するためのサンプルスタックのデプロイ

make deploy & make test.package & make test.deploy

- 前回紹介の記事の通り、Webhook URL と Lambda のパラメータをパラメータストアに保管し、リソースをデプロイします。
- 今回は簡単にするために、Customer というサービスがあり、それが Stack 名に入っているものとしています。(スタック名: prod-Customer-Stack)

パラメータ一覧

- WebhookURL
    - パラメータ名: webhookURL
    - パラメータは KMS を使用して暗号化しています。
- LambdaMemorySize
    - パラメータ名: /Cloud/prod/Customer/LambdaMemorySize
        - パラメータは/Cloud/(PHASE)/(サービス名)/実際のパラメータというように設定しています。
    - 初期値: 128
- LambdaTimeout
    - パラメータ名: /Cloud/prod/Customer/LambdaTimeout
- 初期値: 120

更新する仕組みの紹介

SQS を確認し、CFn をアップデートする指示をする Lambda

SQS から 10 分に 1 回溜まったメッセージを取得し、メッセージが 1 分以内に追加で来なければ、次の処理に移るようにしています。
SQS はメッセージに、「(パラメータ名)が(アクション)されました」という形なので、そこからアップデートすべきサービスのみを取得しています。

# 他のフィールドは省略
{
 "Message": "\"/Cloud/prod/Customer/LambdaTimeout is Updateed\"",
}

アップデートすべきサービスを取得し、サービス毎にCFnスタックを更新するLambdaを並列で励起します。

import boto3
from logging import getLogger
import json
import os
# xray patck
from aws_xray_sdk.core import patch
patch(["boto3"])
log = getLogger(__name__)
queue_name = os.environ["ENV_NAME"] + "-InvokeChangeLambdaQueue"
sqs = boto3.resource("sqs")
queue = sqs.get_queue_by_name(QueueName=queue_name)
client_lambda = boto3.client("lambda")

def app_push_tag_from_queue(event, context):
   log.info(json.dumps(event))
   # Receive all until queue is all read.
   service_list = []
   delete_list = []
   process_count = 0
   while process_count < 3:
       msg_list = queue.receive_messages(
           MaxNumberOfMessages=10, VisibilityTimeout=10, WaitTimeSeconds=20
       )
       for message in msg_list:
           message.delete()
       if len(msg_list) != 0:
           delete_list[len(delete_list) : len(delete_list)] = msg_list
           process_count = 0
       else:
           process_count += 1
   for message in delete_list:
       notify_msg = json.loads(message.body)["Message"]
       msg_only = notify_msg.split("/")
       if msg_only[2] == "prod":
           service_list.append(msg_only[3])
   # remove dupulicate service list
   change_services = list(set(service_list))
   log.info("Update Service: ", change_services)
   for change_service in change_services:
       client_lambda.invoke(
           FunctionName=os.environ["ENV_NAME"] + "-CFn-Update-Executer",
           InvocationType="Event",
           Payload=json.dumps({"Service": change_service}),
       )
       log.info(f"Kicked Lambda for {change_service} Service Update.")

def lambda_handler(event, context):
   try:
       app_push_tag_from_queue(event, context)
   except Exception as e:
       log.info(e)

CFn のアップデートを実際に実行する Lambda

boto3 を使用して、実際に CFn をアップデートしています。変更されたパラメータは実際にはパラメータストアのパラメータ名を参照するため、CFnのパラメータはそのまま更新に使用します。
LambdaのTimeout上限を15分に設定しており、その上限以内である13 分間の間にスタックの更新がされれば、Slack などに通知し、さらにメールでも通知します。
更新がされていなければ、CFnの状態を再度確認する処理を行う Lambda を励起します。

import boto3
import botocore
from loging import getLogger
import json
import requests
import os
# xray patch
from aws_xray_sdk.core import patch
patch(["boto3"])
log = getLogger(__name__)

cfnclient = boto3.client("cloudformation")
ssm = boto3.client("ssm")
response = ssm.get_parameter(
   Name="webhookURL",
   WithDecryption=True
)
teams_url = response["Parameter"]["Value"]
snsclinet = boto3.client("sns")
aws_region = boto3.session.Session().region_name
account_id = boto3.client("sts").get_caller_identiry().get("Account")
topic_arn = (
   "arn:aws:sns:"
   + aws_region
   + ":"
   + account_id
   + ":"
   + os.environ["ENV_NAME"]
   + "-CFn-Update-Notification"
)

def execute_upate_stack(event, context):
   change_service = event["Service"]
   response = cfnclient.describe_stacks(
       StackName=f"prod-{change_service}-stack")
   key_list = [key["ParameterKey"]
               for key in response["Stacks"][0]["Parameter"]]
   parameter_list = [
       {"ParameterKey": f"{key}", "UsePreviousValue": True} for key in key_list
   ]
   response = cfnclient.update_stack(
       StackName=f"prod-{change_service}-stack",
       UsePreviousTemplate=True,
       Paramters=parameter_list,
       Capabilities=["CAPABILITY_NAMED_IAM"],
   )
   log.info(f"Changed {change_service} Service.")
   waiter = cfnclient.get_waiter("stack_update_complete")
   # 30 sec * 26 times
   waiter.config.maxAttempts = 26
   try:
       waiter.wait(StackName=f"prod-{change_service}-stack")
   except botocore.exceptions.WaiterError as e:
       log.ingo(e)
       Msg = f"Updating {change_service} stack service."
       Success = False
   else:
       requests.post(
           teams_url,
           json.dumps(
               {
                   "title": "Update Stack Complete",
                   "text": f"{change_service} is changed.",
               }
           ),
       )
       Msg = f"Update {change_service} stack complete"
       Success = True
   finally:
       return {"Success": Success, "body": Msg, "Service": change_service}

def lambda_handler(event, context):
   response = execute_upate_stack(event, context)
   # send email if success. kick lambda for check if not
   if response["Success"]:
       snsclinet.publish(
           TopicArn=topic_arn,
           Message=response["body"],
           Subject="CFn Update Notification",
       )
   else:
       return {"Service": response["Service"]}
       

CFn のステータスをチェックする Lambda

CFn の自動更新を実行する Lambda のときと同様に、13 分間の間にスタックがアップデート完了するかチェックします。
されている場合は、Slack とメールで通知します。
されていない場合でも、メールで通知します。

import boto3
import botocore
from logging import getLogger
import json
import requests
import os
# x-ray patch
from aws_xray_sdk.core import patch
patch(["boto3"])
log = getLogger(__name__)
ssm = boto3.client("ssm")
response = ssm.get_parameter(
   Name="webhookURL",
   WithDecryption=True
)
teams_url = response["Parameter"]["Value"]
cfnclient = boto3.client("cloudformation")
snsclient = boto3.client("sns")
aws_region = boto3.session.Session().region_name
account_id = boto3.client("sts").get_caller_identiry().get("Account")
topic_arn = (
   "arn:aws:sns:"
   + aws_region
   + ":"
   + account_id
   + ":"
   + os.environ["ENV_NAME"]
   + "-CFn-Update-Notification"
)

def retry_treatment(event, context):
   change_service = event["requestPayload"]["Service"]
   log.info(f"Checking {change_service} Service")
   waiter = cfnclient.get_waiter("stack_update_complete")
   # 30 sec * 26 times
   waiter.config.maxAttempts = 26
   try:
       waiter.wait(Stackname=f"prod-{change_service}-stack")
   except botocore.exceptions.WaiterError as e:
       log.info(e)
       Msg = f"Not Accomplished Updating {change_service} stack Service"
       Success = False
   else:
       requests.post(
           teams_url,
           json.dumps(
               {
                   "title": "Update Stacks Complete",
                   "text": f"{change_service} is changed.",
               }
           ),
       )
       Msg = f"Update {change_service} stack complete"
       Success = True
   finally:
       return {"Success": Success, "body": Msg, "Service": change_service}

def lambda_handler(event, context):
   response = retry_treatment(event, context)
   if response["Success"]:
       snsclient.publish(
           TopicArn=topic_arn,
           Message=response["body"],
           Subject="CFn Update Notification",
       )
   else:
       snsclient.publish(
           TopicArn=topic_arn,
           Message=response["body"],
           Subject="CFn Update Notification",
       )

実際にパラメータ変更をしてみる。

- LambdaMemorySize: 128 → 256
- LambdaTimeout: 120 → 180

パラメータ変更によって、イベント作成および SQS にメッセージが蓄積される。
このメッセージが 10 分に 1 回取得されるので、しばらく待つ必要があります。

終了すると Cloudformation のパラメータの項目の解決ずみの値が図のように変更したものに更新されているかと思います。

画像1

同時にエビデンスとして残されるメールは以下の文言で送られてきます。

画像2

また、今回の例においては、個人で使用可能な Slack や Teams はあいにく持ってなかったため、Webhook URL を使用して、通知を送信する部分はコメントアウトしています。

また、今回パラメータを変更して、リソースを更新できるのは主にサーバレス系のリソースです。template.yml の IAM Role で許可している以外のリソースに適応したい場合は、そのリソースを追加する必要があります。

おわりに

今回の例においては、事情により Lambda の Memory Size など性能と課金に関わるパラメータをパラメータストアに保存する必要があり一般的なパラメータ変更戦略ではないのかもしれません。
サービスの成長に伴うパラメータ戦略がどのように管理がされているのかについては、記事などがほとんどないので、手探りの状態でやっていました。もし、この記事を見られた方の中で、クラウドサービスにおけるパラメータ戦略についてのご知見があれば、教えていただきたいです。

この記事が気に入ったらサポートをしてみませんか?