見出し画像

fp-tsを使ったFail Firstせずに複数プロミスタスクをフロー化して実行するワークアラウンドの紹介

こんにちわ。nap5です。


fp-ts
を使ったFail Firstせずに複数プロミスタスクをフロー化して実行するワークアラウンドの紹介です。


まずは、ミニデモからです。TaskモナドをつかうとFail Firstの振る舞いをせずに処理を継続することができます。ほかのモナド系はたいてい即時でエラーを返して処理を終了してくれます。


import { z } from "zod";
import { TaskEither } from "fp-ts/lib/TaskEither";
import { tryCatch } from "fp-ts/lib/TaskEither";
import { ApplicativePar, ApplicativeSeq } from "fp-ts/lib/Task";
import { sequence } from 'fp-ts/lib/Array'

const CustomErrorDataSchema = z.custom<CustomError>();
type CustomErrorData = z.infer<typeof CustomErrorDataSchema>;
type UserId = number;

class CustomError extends Error {
  constructor(message: string, option?: { cause: unknown }) {
    super(message, option);
  }
}

const doN = (n: number): TaskEither<CustomErrorData, UserId> => {
  return tryCatch(
    async () => {
      if (n % 2 === 0) {
        return Promise.reject(
          new CustomError(`Something went wrong... [${n}]`)
        );
      }
      return n;
    },
    (e) => e as CustomErrorData
  );
};

(async () => {
  const numbers = [1, 2, 3, 4, 5];
  const tasks = numbers.map(doN);
  const resultsPar = await sequence(ApplicativePar)(tasks)() // @see https://github.com/gcanti/fp-ts/issues/1626#issuecomment-1000891972
  const resultsSeq = await sequence(ApplicativeSeq)(tasks)()
  console.log(resultsPar, resultsSeq)
})();


これをもとに少し応用してバッチ処理のようなユースケースでデモを作ってみました。

import { pipe } from "fp-ts/lib/function";
import seedrandom from "seedrandom";
import * as TE from "fp-ts/lib/TaskEither";
import * as T from "fp-ts/lib/Task";
import * as A from "fp-ts/lib/Array";
import * as E from "fp-ts/lib/Either";

const rng = seedrandom("fixed-seed");

interface ErrorFormat {
  fn: string;
  detail: Record<string, unknown>;
}

interface CustomErrorOptions extends ErrorOptions {
  cause?: ErrorFormat;
}

class CustomError extends Error {
  cause?: ErrorFormat;
  constructor(message: string, options?: CustomErrorOptions) {
    super(message, options);
    this.cause = options?.cause;
  }
}

export class ValidationError extends CustomError {
  name = "ValidationError" as const;
  constructor(message: string, options?: CustomErrorOptions) {
    super(message, options);
  }
}

export type User = {
  id: number;
  name: string;
  isDeleted?: boolean;
};

const createUser = (formData: User): TE.TaskEither<ValidationError, User> =>
  rng() < 0.1
    ? TE.left(
        new ValidationError("Failed createUser.", {
          cause: { fn: "createUser", detail: formData },
        }),
      )
    : TE.right({ ...formData, isDeleted: rng() > 0.8 });

const deleteUser = (formData: User): TE.TaskEither<ValidationError, User> =>
  !formData.isDeleted
    ? TE.right({ ...formData, isDeleted: !formData.isDeleted })
    : TE.left(
        new ValidationError("Already deleted.", {
          cause: { fn: "deleteUser", detail: formData },
        }),
      );

const notifyToUser = (formData: User): TE.TaskEither<ValidationError, User> =>
  rng() < 0.8
    ? TE.right({ ...formData })
    : TE.left(
        new ValidationError("Failed notification.", {
          cause: { fn: "notifyToUser", detail: formData },
        }),
      );

const createUsers = (data: User[]) =>
  A.sequence(T.ApplicativePar)(data.map(createUser));
const deleteUsers = (data: User[]) =>
  A.sequence(T.ApplicativePar)(data.map(deleteUser));
const notifyToUsers = (data: User[]) =>
  A.sequence(T.ApplicativePar)(data.map(notifyToUser));

export const demo = (data: User[]) =>
  pipe(
    data,
    createUsers,
    T.chain((results) => {
      const { left: lefts, right: rights } = A.separate(results);
      const deleteUsersTask = deleteUsers(rights);
      return pipe(
        deleteUsersTask,
        T.map((d) => [...d, ...lefts.map(E.left)]),
      ); // combined previous errors
    }),
    T.chain((results: E.Either<ValidationError, User>[]) => {
      const { left: lefts, right: rights } = A.separate(results);
      const notifyToUsersTask = notifyToUsers(rights);
      return pipe(
        notifyToUsersTask,
        T.map((d) => [...d, ...lefts.map(E.left)]),
      ); // combined previous errors
    }),
  );


コアのトランザクションプロセスは以下になります。
これらは引数に複数のユーザーデータを受け取り、プロミスタスク化して実行するプロセスになります。

const createUsers = (data: User[]) => A.sequence(T.ApplicativePar)(data.map(createUser));
const deleteUsers = (data: User[]) => A.sequence(T.ApplicativePar)(data.map(deleteUser));
const notifyToUsers = (data: User[]) => A.sequence(T.ApplicativePar)(data.map(notifyToUser));


このインターフェースを保ちながら、前回処理の結果(T型とE型)のエラーハンドリングを行っている部分は以下になります。

export const demo = (data: User[]) =>
  pipe(
    data,
    createUsers,
    T.chain((results) => {
      const { left: lefts, right: rights } = A.separate(results);
      const deleteUsersTask = deleteUsers(rights);
      return pipe(
        deleteUsersTask,
        T.map((d) => [...d, ...lefts.map(E.left)]),
      ); // combined previous errors
    }),
    T.chain((results: E.Either<ValidationError, User>[]) => {
      const { left: lefts, right: rights } = A.separate(results);
      const notifyToUsersTask = notifyToUsers(rights);
      return pipe(
        notifyToUsersTask,
        T.map((d) => [...d, ...lefts.map(E.left)]),
      ); // combined previous errors
    }),
  );

達成したいシナリオに応じてTaskモナドのchainで前回プロセス結果をseparateしながら、EitherモナドのエラーをCombineしているのがポイントです。前回プロセスのうち正常系データはそのまま、次のプロセスの引数に渡しております。

const { left: lefts, right: rights } = A.separate(results);
const notifyToUsersTask = notifyToUsers(rights);


最後にデモコードになります。





簡単ですが、以上です。

この記事が気に入ったらサポートをしてみませんか?