見出し画像

キューと並列処理を組み合わせたプロミスタスクのラフなデモ

こんにちわ。nap5です。


キューと並列処理を組み合わせたプロミスタスクのラフなデモの紹介です。


import { Result } from "neverthrow";

interface ErrorFormat {
  fn: string;
  detail: Record<string, unknown>;
}

interface CustomErrorOptions extends ErrorOptions {
  cause?: ErrorFormat;
}

class CustomError extends Error {
  cause?: ErrorFormat;
  constructor(message: string, options?: CustomErrorOptions) {
    super(message, options);
    this.cause = options?.cause;
  }
}

export class ErrorOfTaskD extends CustomError {
  name = "ErrorOfTaskD" as const;
  constructor(message: string, options?: CustomErrorOptions) {
    super(message, options);
  }
}

export class ErrorOfTaskE extends CustomError {
  name = "ErrorOfTaskE" as const;
  constructor(message: string, options?: CustomErrorOptions) {
    super(message, options);
  }
}

export const createTask = <TName extends string, TValue, TError = Error>(
  name: TName,
  fn: () => Promise<Result<TValue, TError>>
): { name: TName; fn: () => Promise<Result<TValue, TError>> } => {
  return { name, fn };
};

export const processQueueConcurrently = async <T>(
  taskFns: ReturnType<typeof createTask>[],
  maxConcurrentTasks: number = 2,
): Promise<T[]> => {
  const resultTasks = new Array(taskFns.length); // populate empty
  const queue = [...taskFns.entries()];

  const processTask = async () => {
    while (queue.length > 0) {
      const [index, item] = queue.shift()!; // When destructive operations are key, the queue being referenced is commonly shared.
      const result = await item.fn();
      resultTasks[index] = { name: item.name, result }; // filling result
      const doneTasks = resultTasks.filter((d) => !!d);
      console.log(
        `${resultTasks.length}件中${doneTasks.length}件が実行済みです[${Math.floor(
          (doneTasks.length / resultTasks.length) * 100
        )}%]`
      );
    }
  };

  const workers = Array.from(
    { length: Math.min(maxConcurrentTasks, taskFns.length) },
    () => processTask()
  );

  await Promise.all(workers);

  return resultTasks as T[];
};


破壊的な変更を行うshiftメソッドをトリガーにして、キューに入ったタスクを処理中のプロミスタスクで共有参照しているのがポイントになります。

Awaitでプロミスタスクが解決するまで、立ち上がるプロミスタスクは引数で与えた数だけ残るので、キューがゼロになるまで、同時実行の機能が実現できます。


たとえば、以下のように使います。


import { Err, Ok, Result } from "neverthrow";
import { test, expect } from "vitest";
import {
  ErrorOfTaskD,
  ErrorOfTaskE,
  createTask,
  processQueueConcurrently,
} from ".";
import { match } from "ts-pattern";

test("Narrowing", async () => {
  const tasks = [
    createTask("Task A", async (): Promise<Result<string, Error>> => {
      await new Promise((r) => setTimeout(r, 1_000));
      return new Ok("cowboy");
    }),
    createTask(
      "Task B",
      async (): Promise<Result<Record<string, unknown>, Error>> => {
        await new Promise((r) => setTimeout(r, 1_000));
        return new Ok({ film: "Cowboy Bebop" });
      },
    ),
    createTask("Task C", async (): Promise<Result<number, Error>> => {
      await new Promise((r) => setTimeout(r, 1_000));
      return new Ok(45);
    }),
    createTask("Task D", async (): Promise<Result<null, ErrorOfTaskD>> => {
      await new Promise((r) => setTimeout(r, 1_000));
      return new Err(
        new ErrorOfTaskD("Something went wrong...", {
          cause: { fn: "Task D", detail: { message: "human mock error" } },
        }),
      );
    }),
    createTask("Task E", async (): Promise<Result<null, ErrorOfTaskE>> => {
      await new Promise((r) => setTimeout(r, 1_000));
      return new Err(
        new ErrorOfTaskE("Something went wrong...", {
          cause: { fn: "Task E", detail: { message: "human mock error" } },
        }),
      );
    }),
    createTask(
      "Task F",
      async (): Promise<Result<Record<string, unknown>[], Error>> => {
        await new Promise((r) => setTimeout(r, 1_000));
        return new Ok([{ a: "a" }, { b: "b" }, { c: "c" }]);
      },
    ),
    createTask("Task G", async (): Promise<Result<boolean, Error>> => {
      await new Promise((r) => setTimeout(r, 1_000));
      return new Ok(false);
    }),
    createTask("Task H", async (): Promise<Result<boolean, Error>> => {
      await new Promise((r) => setTimeout(r, 1_000));
      return new Ok(true);
    }),
  ];
  type TaskValueMapping = {
    [TName in (typeof tasks)[number]["name"]]: Awaited<
      ReturnType<Extract<(typeof tasks)[number], { name: TName }>["fn"]>
    >;
  };

  type TaskUnion = {
    [K in keyof TaskValueMapping]: {
      name: K;
      result: TaskValueMapping[K];
    };
  }[keyof TaskValueMapping];

  const results = await processQueueConcurrently<TaskUnion>(tasks, 128);
  const okData: TaskUnion[] = results.flatMap<TaskUnion>((d) =>
    d.result.isOk() ? d : [],
  );
  const errData: TaskUnion[] = results.flatMap<TaskUnion>((d) =>
    d.result.isErr() ? d : [],
  );

  errData.forEach((d) => {
    match(d)
      .with({ name: "Task A" }, (data) => console.log(data.result))
      .with({ name: "Task B" }, (data) => console.log(data.result))
      .with({ name: "Task C" }, (data) => console.log(data.result))
      .with({ name: "Task D" }, (data) => console.log(data.result))
      .with({ name: "Task E" }, (data) => console.log(data.result))
      .with({ name: "Task F" }, (data) => console.log(data.result))
      .with({ name: "Task G" }, (data) => console.log(data.result))
      .with({ name: "Task H" }, (data) => console.log(data.result))
      .exhaustive();
  });

  const report = {
    summary: {
      okCount: okData.length,
      errCount: errData.length,
    },
    detail: {
      values: okData.map((d) => ({
        name: d.name,
        value: d.result._unsafeUnwrap(),
      })),
      errors: errData.map((d) => ({
        name: d.name,
        error: d.result._unsafeUnwrapErr(),
      })),
    },
  };
  console.log(report);
});



demo code.


簡単ですが、以上です。


この記事が気に入ったらサポートをしてみませんか?