Step FunctionsのIterator使うと、Lambdaをparallel実行できてクソ便利なのでCDKでやってみた話

こんにちは!エアークローゼットCTOの辻(Twitter)です。
この記事はAWSアドベントカレンダー2021の22日目の記事です!

はじめに

みなさん、Step Functionsは使っていますか?本記事では、その中でもIteratorの機能を紹介します。
Iteratorは簡単に説明すると、ある処理をMulti ThreadのようにParallelに同時実行できる機能です。そのため、例えば処理に時間がかかりすぎてLambdaでの実装を諦めざるを得ないときに、その解決策となりえます。また、単純に処理を高速化したいときにも使えますね。
ここでは配列のパラメータをIteratorに渡してLambdaを並行実行するための、CDKを使った実装方法を説明します。

本記事の対象者

  • Step FunctionsのIteratorを使ってみたいけど、いまいち設定方法がわからず、具体的な設定方法が知りたい方。

  • timeoutが原因でLambdaでの実装を諦めてしまっている方。

  • Lambdaの処理を高速化したい方。

今日書かないこと

Step Functions自体の話や、CDKってなんぞやとかは説明しませんので、知りたい方はすでにある別の方が書いてくださっている記事でも読んでいただければと思います。

実装方法

サンプルコード

本日説明に使用するサンプルコードをこちらで公開しておきますので、手元で確認してみたい方はcloneしていただければ。
https://github.com/air-closet/cdk-sfs-iterator-sample

処理フロー

下記のようなフローを実装します。

Step Functionsの定義
  1. firstTaskInvokeでは、数値の配列を定義し後続の処理にわたす。

  2. iteratorTaskInvokeでは、それぞれの数値をfizzBuzz判定して、responseに詰めて後続の処理にわたす。

  3. 2の結果を受け取って標準出力に流す。

Lambdaのソースコード

firstTask

exports.handler = async () => {
  const arr = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20];

  return {
    fixedVal: 'This parameter is always used as same variable in all iterator.',
    arr,
  };
};

iteratorTask
(fizzBuzzの実装はてきとーなのでそこは突っ込まないで下さいw)

exports.handler = async (event: any) => {
  const input = event.param;

  return fizzBuzz(input);
};

function fizzBuzz(input: number) {
  let isFizz = input % 3 === 0;
  let isBuzz = input % 5 === 0;

  if (isFizz && isBuzz) {
    return 'fizzBuzz';
  } else if (isFizz) {
    return 'fizz';
  } else if (isBuzz) {
    return 'buzz';
  } else {
    return input + '';
  }
}

endTask

exports.handler = async (event: any) => {
  console.log(event.result);
};

CDKのソースコード

import { Duration, Stack, StackProps } from 'aws-cdk-lib';
import { Map, StateMachine} from 'aws-cdk-lib/aws-stepfunctions';
import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { Runtime, Function, Code } from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';

export class SfsStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    const commonOptions = {
      runtime: Runtime.NODEJS_14_X, // ランタイムの指定
      code: Code.fromAsset('dist'), // ソースコードのディレクトリ -> npm run build
      memorySize: 1024, // メモリーの指定
      timeout: Duration.seconds(60),
      allowPublicSubnet: true,
    };
    // Lambda Function 作成
    const firstTask = new Function(this, 'firstTaskHandler', {
      ...commonOptions,
      functionName: 'firstTaskHandler',
      handler: 'first_task.handler',
    });
    const iteratorTask = new Function(this, 'iteratorTaskHandler', {
      ...commonOptions,
      functionName: 'iteratorTaskHandler',
      handler: 'iterator_task.handler',
    });
    const endTask = new Function(this, 'endTaskHandler', {
      ...commonOptions,
      functionName: 'endTaskHandler',
      handler: 'end_task.handler',
    });

    // Step Functionのタスク定義
    const firstTaskInvoke = new LambdaInvoke(this, 'firstTaskInvoke', {
      lambdaFunction: firstTask,
    });
    // Step Functionのiteratorタスク定義
    const iteratorTaskMap = new Map(this, 'iteratorTaskMap', {
      maxConcurrency: 0, // 同時実行上限(0の場合リソースの許す限り上限なし)
      inputPath: '$.Payload', // Lambdaのevent引数の中でiteratorで使いたいパラメータのrootを設定。
      itemsPath: '$.arr', // inputPathの中で、iteratorでloop変数となる配列のプロパティを指定。つまりこの場合 event.Payload.arrを指定していることになる。
      parameters: {
        'param.$': '$$.Map.Item.Value', // loop変数の名前を指定。$$.Map.Item.Valueがloop変数であることを表している。
        'fixedVal.$': '$.fixedVal', // inputPathの中で、どのiteratorでも固定で使いたいプロパティがあればこのように設定する。何個でもいい。
      },
      resultPath: '$.Payload.result', // iteratorの処理結果を格納するプロパティを設定。
      outputPath: '$.Payload', // 後続のタスクにわたすプロパティを設定。resultPathがこのプロパティ内に含まれていない場合、結果を返すことができなくなるため、resultPathはoutputPath内に指定するのが望ましい。
    });
    // Step Functionのタスク定義
    const iteratorTaskInvoke = new LambdaInvoke(this, 'iteratorTaskInvoke', {
      lambdaFunction: iteratorTask,
      payloadResponseOnly: true,
    });
    // Step Functionのタスク定義
    const endTaskInvoke = new LambdaInvoke(this, 'endTaskInvoke', {
      lambdaFunction: endTask,
      payloadResponseOnly: true,
    });
    iteratorTaskMap.iterator(iteratorTaskInvoke);

    // step function定義
    const definition = firstTaskInvoke
      .next(iteratorTaskMap)
      .next(endTaskInvoke);

    const stateMachine = new StateMachine(this, 'MyStateMachine', { definition });

  }
}

iterator部分の解説

つまりこの部分のことです。

    // Step Functionのiteratorタスク定義
    const iteratorTaskMap = new Map(this, 'iteratorTaskMap', {
      maxConcurrency: 0, // 同時実行上限(0の場合リソースの許す限り上限なし)
      inputPath: '$.Payload', // Lambdaのevent引数の中でiteratorで使いたいパラメータのrootを設定。
      itemsPath: '$.arr', // inputPathの中で、iteratorでloop変数となる配列のプロパティを設定。つまりこの場合 event.Payload.arrを指定していることになる。
      parameters: {
        'param.$': '$$.Map.Item.Value', // loop変数の名前を指定。$$.Map.Item.Valueがloop変数であることを表している。
        'fixedVal.$': '$.fixedVal', // inputPathの中で、どのiteratorでも固定で使いたいプロパティがあればこのように設定する。何個でもいい。
      },
      resultPath: '$.Payload.result', // iteratorの処理結果を格納するプロパティを設定。
      outputPath: '$.Payload', // 後続のタスクにわたすプロパティを設定。resultPathがこのプロパティ内に含まれていない場合、結果を返すことができなくなるため、resultPathはoutputPath内に指定するのが望ましい。
    });

コメントで書いた内容がすべてなのですが、ここに出てきているinputPath、itemsPath、parameters、resultPath、outputPathが公式ドキュメントを読んでも非常にわかりづらく結構ハマったポイントです。

inputPath
Lambdaのevent引数の中でiteratorで使いたいパラメータのrootを設定。
具体的には下記のようなパラメータがiteratorMapに入ってくるので、このうちのPayloadを各処理で使いたいパラメータのrootとして設定してます。firstTaskでreturnした値がPayloadに入っているのでそれを使いたいわけです。

iteratorTaskMapへの入力パラメータ

itemsPath
inputPathの中で、iteratorでloop変数となる配列のプロパティを設定。inputPathではPayloadを指定して、ここではarrを指定しているため、この場合event.Payload.arrを指定していることになる。

parameters
iteratorの各処理に入ってくるパラメータを設定します。$$.Map.Item.Valueを使うと、itemsPathで設定した配列の各要素が、その変数に入ってきます。他には、どのiteratorでも固定で使いたいプロパティがあればそれも何個でも追加可能です。ここではfixedValをすべてのiteratorで参照できるようにしています。具体的には下記のような入力パラメータとなります。

iteratorTaskへの入力パラメータ

resultPath
iteratorの処理結果を格納するプロパティを設定します。各処理の結果が配列に格納され、具体的には下記のような形になります。

iteratorTaskMapの出力パラメータ

outputPath
後続のタスクにわたすプロパティを設定。resultPathがこのプロパティ内に含まれていない場合、結果を返すことができなくなるため、resultPathはoutputPath内に指定するのが望ましいです。

まとめ

iteratorめっちゃ便利なんですが、良いドキュメントが見つからず、公式ドキュメント見ながら試行錯誤してようやく理解できたので、ぜひ参考になれば嬉しいです!あといくつかキャプチャも貼りましたが、Step Functionsは各処理ごとの入出力のパラメータや、例外発生時にどこでなぜ止まったのかが一目瞭然ですぐにわかるので、iterator使わずとも非常におすすめです!
サンプルのソースコードもあらためて貼っておきますね。
https://github.com/air-closet/cdk-sfs-iterator-sample

さいごに

私がCTOをやっている株式会社エアークローゼットでもアドベントカレンダーをやっているので、この記事が良かった!と感じていただいた方はこちらもぜひ見てもらえると嬉しいです!