【AWS】 大量の Lambda を同時に実行する

サーバーレスのアーキテクチャを構築する際、ちょっとした関数を定義するのに重宝するのが AWS の Lambda でしょう。また、この Lambda は、少しの工夫だけで、大量の並列処理にも利用することができます。今回は、この「大量のLambdaを同時に実行する」方法について、コードを混じえつつ書いていきたいと思います。

アプリケーションの開発をしていると、大量のAPIを同時に叩きたいというケースが起こり得ます。例えば株価ビューワを開発するならば、数千銘柄の株価を同時に取得しなくてはなりません。

他にも、例えば膨大な数のウェブページからスクレイピングをする場合、ひとつのスクリプトで実行すると、すべてのページを取得するのにかなりの時間がかかってしまいます。制限時間がないならば気長に処理が終わるのを待てばよいかもしれませんが、通常は実行終了までが早ければ早いほど望ましいと言えます(特に Lambda には実行時間制限がある)。

いずれのケースも、処理を並列にすることによって問題が解決できますが、では Lambda では一体どのように実現すればよいのでしょうか?

実は、こちらのAWSのページに実装のヒントがあります。リンク先に、下のような図があります。(クライアントから実行する)エンドポイントとなる Lambda から、また別の(複数の) Lambda を呼んでいくという流れです。

画像1

全体の流れは下記です。

Orchestration Lambda: クライアントが実行する単一のエンドポイント。実行すべき処理をいくつかのバッチに分け、各バッチごとに Group Lambda を呼ぶ。

Group Lambda: バッチ内の各処理ごとに、 Page Lambda を呼ぶ。

Page Lambda: ただ1つの処理を行う。

例として、全部で 10,000 の処理を実行したい(10,000ページのスクレイピングなど)としましょう。

まずは全処理をいくつかのバッチに分割します。例えば各バッチ100個ずつの処理を担うとすると、バッチ数も100となりますので、Orchestration Lambda から、100 の Group Lambda を呼び出すことになります。

そして呼び出された各 Group Lambda で、今度は渡されたバッチ内の 100 の処理を行うために 100 の Page Lambda を実行すれば、ほぼ同時に 10,000 の処理が実行できることになります。

Orchestration Lambda からいきなり Page Lambda を呼び出すのではなく、一度 Group Lambda でバッチを挟んでいるのは、Lambda を呼び出す処理のオーバーヘッドをなるべく少なくするためです。

簡単に Lambda を書いてみると、それぞれ次のようになります。
(実装は Python で行っています。 Lambda の実行権限がロールに必要となることに注意してください。)

Orchestration Lambda (hello_orchestration)

# hello_orchestration (.py)
# メモリ: 256MB, タイムアウト: 10秒

import json
import boto3

client = boto3.client('lambda')
BATCH_SIZE = 100  # 1つのバッチが担う処理数


def lambda_handler(event, context):
   event_ids = list(range(10000))  # 処理数は 10,000

   chunks = \
       [event_ids[i:i+BATCH_SIZE]
           for i in range(0, len(event_ids), BATCH_SIZE)]

   for chunk in chunks:
       # Group Lambda の呼び出し
       response = client.invoke(
           FunctionName='hello_group',
           InvocationType='Event',
           Payload=json.dumps({'event_ids': chunk})
       )

   return {
       'statusCode': 200,
       'body': json.dumps('Hello from Lambda!')
   }

Group Lambda (hello_group)

# hello_group (.py)
# メモリ: 256MB, タイムアウト: 10秒

import json
import boto3

client = boto3.client('lambda')


def lambda_handler(event, context):
   event_ids = event.get('event_ids')
   
   for event_id in event_ids:
       # Page Lambda の呼び出し
       response = client.invoke(
           FunctionName='hello_page',
           InvocationType='Event',
           Payload=json.dumps({'event_id': event_id})
       )

   return {
       'statusCode': 200,
       'body': json.dumps('Hello from Lambda!')
   }

Page Lambda (hello_page)

# hello_page (.py)
# メモリ: 128MB, タイムアウト: 8秒

import json
import time
import random


def lambda_handler(event, context):
   event_id = event.get('event_id')

   # スクレイピング、APIを叩くなどの処理
   time.sleep(random.randint(1, 5))
   
   response = 'Response from event_id: {}'.format(event_id)
   print(response)

   return {
       'statusCode': 200,
       'body': json.dumps(response)
   }

コードは以上です。Orchestration Lambda を実行してみると、CloudWatch Logs で並列に処理できている(ほぼ同時にすべての処理が終わっている)ことが確認できるかと思います。

Lambda でお手軽に並列処理ができることによって、大量な処理が必要な開発も怖くなくなりますね。


※ 本記事の内容はDDoS攻撃にも応用することができてしまいます。決して悪用はせず、単一ホストへのスクレイピングを同時に行うなど、ホスト側に負荷をかける処理も避けてください。

この記事が気に入ったらサポートをしてみませんか?