見出し画像

[AWS] SQS #2 - EC2, SQS, Lambdaを組み合わせてメッセージの連動を行う

■ 概要
SQSを用いたシステム間のメッセージ連動を想定して、EC2(Amazon Linux 2)上のアプリケーション(SDK for Java)からSQSを通して、Lambdaの処理を実行する、という構成を実装してみた。

■ 目標構成

画像1

前回の記事「[AWS] SQS #1 - テスト環境を作ってみた (SDK for Java)」 の構成を引き続き利用する。今回は連動用のキューを用意して、そのキューへのデータPUTをトリガーに、Lambdaの関数が実行される仕組みになっている。

■ 目次
・キューの作成 (SQS)
・メッセージ送信用プログラムの作成 (Java)
・Lambda関数の実装 (Python)
・メッセージ連動の確認 (Cloudwatch)

■ キューの作成 (SQS)

画像2

「MyTestQueue-001.fifo」を作成

■ メッセージ送信用プログラムの作成 (Java)
SDK for Javaでサンプルとして公開されているプログラムでは、一件のみの送信方法しかなかったため、複数メッセージを送信できるように改良してみました。

SendMultipleMessagesSQS.java

import java.util.List;
import java.util.Map.Entry;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.DeleteQueueRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;

import java.time.LocalDateTime;

public class SendMultipleMessagesSQS {

   public static void main(String[] args) throws Exception {
       // Create Client
       final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

       // Define Queue Url
       final String queueUrl = "https://sqs.ap-northeast-1.amazonaws.com/157634048600/MyTestQueue-001.fifo";
       
       // Define the number of messages you send to SQS
       int sendMsgNum = 10;

       // Display processing start time
       System.out.println("===========================================");
       System.out.println("Start Sending at : " + LocalDateTime.now().toString());
       System.out.println("===========================================");

       // Send Messages
       for (int i=0; i<sendMsgNum; i++){
           // Difine the sequence number for DedupulicationId which is necessary for FIFO Queue
           String seqNum = String.valueOf(i);

           // Create Messages to send
           SendMessageRequest sendMsgRequest = new SendMessageRequest()
               .withQueueUrl(queueUrl)
               .withMessageBody("Test Message " + seqNum)
               .withMessageDeduplicationId("dedepId_" + seqNum)
               .withMessageGroupId("Test001");

           // Send Message
           sqs.sendMessage(sendMsgRequest);
       }

       // Display processing start time
       System.out.println("===========================================");
       System.out.println("Finish Sending at : " + LocalDateTime.now().toString());
       System.out.println("===========================================");
   }
}

Githubでソース公開してます

実行用のディレクトリ(SendMultipleMessagesSQS)を作成。ディレクトリ/ファイル権限はサンプルと合わせておきました。

[ec2-user@ip-172-31-36-63 samples]$ mkdir SendMultipleMessagesSQS
[ec2-user@ip-172-31-36-63 samples]$ chmod 755 SendMultipleMessagesSQS
[ec2-user@ip-172-31-36-63 samples]$ ls -ld SendMultipleMessagesSQS
drwxr-xr-x 2 ec2-user ec2-user 6 Jul 29 01:37 SendMultipleMessagesSQS

必要なファイルのアップロード
・build.xml
・SendMultipleMessagesSQS.java

[ec2-user@ip-172-31-36-63 SendMultipleMessagesSQS]$ ll
total 8
-rw-r--r-- 1 ec2-user ec2-user  688 Jul 29 00:55 build.xml
-rw-r--r-- 1 ec2-user ec2-user 2387 Jul 29 01:31 SendMultipleMessagesSQS.java
[ec2-user@ip-172-31-36-63 SendMultipleMessagesSQS]$ chmod 664 *
[ec2-user@ip-172-31-36-63 SendMultipleMessagesSQS]$ ll
total 8
-rw-rw-r-- 1 ec2-user ec2-user  688 Jul 29 00:55 build.xml
-rw-rw-r-- 1 ec2-user ec2-user 2387 Jul 29 01:31 SendMultipleMessagesSQS.java

いざ、実行

[ec2-user@ip-172-31-36-63 SendMultipleMessagesSQS]$ ant
Buildfile: /home/ec2-user/aws_sdk_java/aws-java-sdk-1.12.30/samples/SendMultipleMessagesSQS/build.xml

run:
   [javac] /home/ec2-user/aws_sdk_java/aws-java-sdk-1.12.30/samples/SendMultipleMessagesSQS/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
   [javac] Compiling 1 source file to /home/ec2-user/aws_sdk_java/aws-java-sdk-1.12.30/samples/SendMultipleMessagesSQS
   [javac] warning: Supported source version 'RELEASE_7' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.8'
   [javac] 1 warning
    [java] ===========================================
    [java] Start Sending at : 2021-07-29T01:54:51.438
    [java] ===========================================
    [java] ===========================================
    [java] Finish Sending at : 2021-07-29T01:54:52.499
    [java] ===========================================

BUILD SUCCESSFUL
Total time: 5 seconds

何とかうまく動きました。送信したメッセージがキューに滞留しているかを確認します。

画像3

きちんと10件対象のキューに滞留していました👏

■ Lambda関数の実装 (Python)

関数を作成

画像4

中身は Python で実装。以下のように取得したメッセージをログに出すため、eventを出力するようにした。

import json

def lambda_handler(event, context):
   
   print(event)
   
   # TODO implement
   return {
       'statusCode': 200,
       'body': json.dumps('Hello from Lambda!')
   }

作成したキューとの紐づけを行う。Lambdaのコンソール画面からトリガーを追加を選択して、対象のキューを選んでいく。以下のように「有効化」にチェックを入れた状態で追加を押すと、すでに滞留しているメッセージを取得してくるはず!

画像5

トリガーが追加されているのがコンソール画面からわかる。

画像6

これにて実装はすべて完了♪

■ メッセージ連動の確認 (Cloudwatch)
最後に、メッセージが連動されているかを確認する。Lambdaコンソール画面から、「モニタリング」を選択すると、「CloudWatchのログを表示」とあるので、そこを選択。

画像7

するといかのようにログが出力されているのがわかる。

画像8

実際にログの中身を見てみる。

画像9

Lambdaの処理開始、終了のログも出てるし、{ 'Records' }の中に受信したメッセージの詳細も出力されている。

画像10

先ほど滞留していたキューのメッセージも以下のように 0件になり、正常にメッセージが連動できていることがわかる。

画像11


SQSには連動できるメッセージサイズの上限が256KB、という制約があるため、次回はその課題を解決するための方法を検証したいと思います。

この記事が気に入ったらサポートをしてみませんか?