見出し画像

Patterns of Enterprise Application Architecture から学ぶ - Chapter 5

はじめに

こんにちは。ソフトウェアエンジニアのttokutakeと申します。

これはPatterns of Enterprise Application Architecureという本を読んでみて、自分が理解した内容を要約して書き起こしていくシリーズの5回目の記事です。

全8回のシリーズとなる予定です。

間違っている部分などがありましたら、ご指摘いただけますと幸いです。

注意事項

  • 対象読者は主にWebアプリケーションのエンジニアです。

  • 本の内容をそのまま記載しているわけではありません。

    • 内容のすべてを記載すると情報量が多いように感じたので、省略している部分がそれなりにあります。

    • 自分自身の見解を述べている箇所もあります。

Chapter 5. Concurrency

並行処理はソフトウェア開発で最も扱いにくい面のひとつで、同じデータを操作する複数のプロセスやスレッドを扱うときに並行処理問題にぶつかります。起こりうる問題を見逃しやすく、テストしにくいので並行処理を扱うのはとても難しいです。

エンタープライズアプリケーションの開発において、多くの並行処理問題はリレーショナルデータベースのトランザクションマネージャーによって回避できます。しかし、当然すべての問題を回避できるわけではありません。なぜならすべての処理を一つのトランザクションに含められるわけではないからです。このような問題をOffline Concurrencyと呼びます。

また並行処理の問題はマルチスレッドをサポートするアプリケーションサーバーでも顔を覗かせます。しかしこれらはOffline Concurrencyと比べればいくらか単純です。(と著者は言いますが、私はこれはこれで難しいと思います。)

この本では並行処理のすべてを取り扱っているわけではありません。Offline Concurrencyを扱うパターンの紹介とアプリケーションサーバーの並行処理問題を簡潔に言及するだけに留めています。

並行処理問題

まずは並行処理の本質的な問題をソースコード管理システムを例に考えていきます。

一つ目はLost Updatesです。Martinがあるソースコード上のメソッドに変更を加え始めたとします。これにはしばらく時間を要します。その間にDavidが同じファイルにある別のメソッドに変更を加えてすぐにコミットしたとします。その後Martinが変更を終えてコミットするとDavidの変更が上書きされて失われてしまうというのがLost Updatesです。( Git などで想像していると「コンフリクトするじゃん」と思われた方もいらっしゃるでしょうが、のちほどコンフリクトについての話もちゃんと出てきます。)

2つ目はInconsistent Readで、それぞれの情報は正しいにも関わらず同時に読み込むと不正確な情報になってしまう問題です。Martinがあるパッケージに含まれるクラスの数を調査しているとします。(言語によってはパッケージを名前空間などに適宜読み替えてもらえると良いかと思います。)そのパッケージには2つのサブパッケージが含まれていて、MartinはまずサブパッケージAには7つのクラスあることを調べました。そのとき電話がかかってきたのでMartinはしばらく席を外します。その間にDavidがバグ修正をした結果、サブパッケージAに2つのクラス、サブパッケージBに3つのクラスを追加しました。電話を終えて調査に戻ったMartinはサブパッケージBを調べて、8つのクラスがあることを確認しました。Martinは合計15のクラスが存在すると結論付けました。しかしDavidがバグ修正をする前は、サブパッケージBには5つのクラスが存在していましたので、本来であれば更新前の合計12のクラスか更新後の合計17のクラスと結論づけるのが正しいはずです。

Inconsistent Readしている様子

これらはデータの正確性が欠けることを問題視しています。問題がそれだけであれば、単に同時に変更できないようにすることで解決します。だたし、それだと今度は Liveness に問題が生じます。Livenessとはどれだけ処理を並行して行えるかということです。

IsolationとImmutability

並行処理問題においては、特に2つの解決策が重要です。1つがIsolation、もう一つがImmutabilityです。

Isolationについての良い例はOSのプロセスです。OSのプロセスはメモリーをプロセスごとに分離しています。これによってそもそも同じデータ領域を扱わないようにするのがIsolationです。他の人がファイルを変更できないようにロックする挙動もIsolationと同じようなものです。他のプロセスから侵害される心配がない領域を見出していくのは、良い並行処理のデザインです。

また変更されることがないImmutableデータを見出すのも重要です。すべてのデータが変更されないことはあり得ません。しかし変更されないデータを見出すことにより、並行処理問題を気にする範囲を狭めることができます。

楽観的・悲観的並行処理コントロール

並行処理を管理には2つの形態があります。それは楽観ロックと悲観ロックです。先ほどの例と同じように、MartinとDavidが同時にファイルを編集する状況を考えていきます。

楽観ロックにおいては、MartinとDavidは変更したいファイルのコピーを作ってお互い自由に変更をします。Davidが先に変更を終えてコミットすると、その変更は問題なく反映されます。そのあとにMartinが変更を終えてコミットしようとするとソースコード管理システムがコンフリクトを発見してコミットを拒否します。そのあとコンフリクトをどのように解決するかはMartinの手にゆだねられます。

悲観ロックでは、Martinはまず変更するファイルを他の人が変更できないように予約をします。Davidもファイルを変更しようとしますが、Martinが予約しているので、DavidはMartinが変更を終えるまで待たなければいけません。

楽観ロックはコンフリクトをあとから発見する手法であり、悲観ロックはコンフリクトを予防する手法であると考えるとわかりやすいと思います。実は楽観ロックは何もロックしていないということがわかりますが、すでに世の中に浸透している用語なので細かいことは気にしないことをオススメします。

楽観ロックと悲観ロックにはそれぞれ長所と短所があります。悲観ロックはコンフリクトを防止してくれる反面、Livenessにおいては不利です。楽観ロックはLivenessにおいては有利ですが、コンフリクトが起こるとその解消は厄介です。ソースコード管理システムのコンフリクトはそこまで厄介でない場合も多いですが、ビジネスのデータでコンフリクトした場合は一から処理をやり直すことになるのがほとんどです。

どちらのロックを選択するかはコンフリクトの頻度と深刻さを考慮して決めるのが良いです。コンフリクトが滅多に起こらないようであれば、楽観ロックを選択すると良いです。Livenessにおいて有利ですし、悲観ロックと比べて実装が容易です。コンフリクトした場合のユーザーへの影響がひどい場合には悲観ロックを選択するのがオススメです。

では実際にどのようにLost UpdatesとInconsistent Readを防ぐかを考えます。悲観ロックを使うとどちらも防ぐことができ、楽観ロックを使う場合もどちらも防ぐことはできますが、Inconsistent Readを防ぎたい場合はやや工夫が必要になります。

悲観ロックではReadロックとWriteロックを使う方法が一般的です。ロックの取得は以下のように制限されます。

  • プロセス1がReadロックを取得している最中に、プロセス2は

    • Readロックを取得できる

    • Writeロックを取得できない

  • プロセス1がWriteロックを取得している最中に、プロセス2は

    • Readロックを取得できない

    • Writeロックを取得できない

Martinがパッケージのクラスの数を数える際にReadロックを取得しておくと、Davidがバグを修正するためにパッケージにクラスを追加しようとしてWriteロックを取得しようとしても、すでにMartinによってReadロックが取得されているためWriteロックを取得できません。DavidはMartinがReadロックを解放するまで待つ必要があります。逆に、Davidが先にWriteロックを取得してパッケージに変更を加えているときにはMartinはReadロックを取得できません。このようにしてInconsistent Readが防がれます。

Readロックしている様子

Lost UpdatesもWriteロックを取得することで防ぐことが可能です。

楽観ロックではタイムスタンプや整数カウンターをマーカーとしてデータ上に残すことでコンフリクトの発見をすることができます。

パッケージが現在v1というマーカーを持っている時点で、MartinがサブパッケージAのクラスの数を数えます。DavidがサブパッケージAとBにクラスを追加します。このときパッケージのマーカーがv2にインクリメントします。その後MartinがサブパッケージBのクラスの数を数えるときにマーカーを確認するとv2になっているので、Martinはコンフリクトを発見できます。このようにしてInconsistent Readを防ぐことができます。

整数マーカーでコンフリクトを発見している様子

ただし、マーカーをサブパッケージごとに持たせてしまうとサブパッケージごとにそれぞれマーカーの更新が行われるだけになってしまい、Inconsistent Readを防ぐことはできません。マーカーがどの範囲をカバーするかは状況に応じて適切に考える必要があります。不用意にすべてのデータを巻き込んでしまうと今度はコンフリクトが発生しすぎてしまいます。

Temporal ReadというテクニックもInconsistent Readの対策に使えます。Temporal Readでは、データを読み込むときにタイムスタンプかラベルをパラメータとして渡して、データベースからタイムスタンプやラベルの時点のデータを返してもらいます。これはGitのコミットハッシュをイメージするとわかりやすいと思います。ただし、当然ですが保存するデータサイズは大きくなりますし、データの取得にかかる時間も大きくなります。

実は悲観ロックには大きな問題があります。それはデッドロックです。MartinがFooというクラスを編集するためにWriteロックを取得します。一方DavidはBarというクラスを編集するためにWriteロックを取得します。その後、編集を続けていくうえでMartinはBarクラスを、DavidはFooクラスを編集しなければいけないことに気づきます。それぞれのWriteロックを取得しようとしますが、Writeロックはすでに取得されてしまっているため解放されるまでお互い待ち続けることになります。これがデッドロックです。

デッドロックしている様子

一見すると簡単に防げそうな問題ではありますが、多くの人間が関わったり、複雑になりがちなソフトウェア開発では意外とこの問題が起こってしまいます。

デッドロックに対処する方法の一つ目はデッドロックを検出することです。発見した場合には犠牲者となるプロセスを選択しないといけません。ただし、そもそもデッドロックを発見すること自体がそんなに簡単ではありません。何かしらのツールやソフトウェアが使えるなら使うのが良いです。

似たようなアプローチとしてロックにタイムアウトを設ける手法があります。これはデッドロックの検出よりは実装が容易です。ただし、デッドロックでなくてもタイムアウトしてしまうことがあり得るので注意が必要です。

別のアプローチとして、そもそもデッドロックを起こさないようにロックを取る順番を強制する手法があります。例えばアルファベット順でロックを取得しなければならないというルールや処理の開始時点で必要なロックはすべて取得しなければならないというルールなどです。アルファベット順のルールが適用されたとすると、先ほどの例ではMartinはFooクラスのロックを取ったあとにはBarクラスのロックを取れないことになります。確かにデッドロックは起きませんが、本質的にはMartinはすでに犠牲者となっていると考えられます。

最後のアプローチはロックの開放を待たずに即失敗させる方法です。これはかなり過激なテクニックですが、実装は非常にシンプルになり、多くの場合にはこのやり方で十分うまく機能します。

保守的になったほうがよい場面では複数の解決策を組み合わせるのが安全です。例えば、処理の開始時点で必要なロックはすべて取得しなければならないというルールに加えて、ロックのタイムアウトも設けておくこともできまう。

デッドロックは見逃すと厄介なので、多少犠牲者が増えたとしてもシンプルで保守的なスキームを採用する方が良さそうです。

トランザクション

エンタープライズアプリケーションにおいて並行性を扱うための主要なツールがデータベーストランザクションです。

トランザクションの定義は以下です。

  • 境界付けられた連続的な処理で始まりと終わりがきちんと定義されている。

  • すべての関連するリソースがトランザクションの始まりと終わりで一貫した状態である。

加えてトランザクションは完全に処理を終えているか、もしくは何も処理されていないかのどちらかの状態で完了する必要があります。例えばある口座からある口座へお金を送金する処理が途中で失敗して、送金元の口座からお金がなくなっただけという状態は許されません。

トランザクションはACID特性という用語でよく説明されます。

  • Atomicity: トランザクション内のすべての処理は成功しなければならない。さもなければ、すべての処理はロールバックされなければならない。

  • Consistency: トランザクションの始まりと終わりで関連するデータは一貫した状態でなければならない。中途半端な状態になってはいけない。

  • Isolation: あるトランザクションの処理結果は、そのトランザクションがコミットされるまで、ほかのトランザクションから見えてはならない。

  • Durability: コミットされたトランザクションは永続化されていなければならない。

トランザクションにはIsolationレベルというものがあります。必要とされるLivenessに応じてレベルを切り替えられます。SQL標準では4つのIsolationレベルがあります。

  • Serializable: 一番強力なIsolationであり、どのタイミングでデータを読み込んだとしても一貫したデータを得られます。すべてのトランザクションがあたかも直列に実行されているような状態を維持します。

  • Repeatable Read: Phantomを許容します。トランザクションT1の途中であるテーブルのレコード数を数えて5つだったとします。別のトランザクションT2がそのテーブルにレコードを1つ追加してコミットしたとします。トランザクションT1が再度テーブルのレコード数を数えると6つでした。このようにトランザクション中に別のトランザクションによって追加されたレコードが見えてしまう現象をPhantomと呼びます。

  • Read Committed: Unrepeatable Readを許容します。トランザクションT1の途中であるテーブルの更新時間が最新の5件のレコードを取得したとします。別のトランザクションT2にそのテーブルのあるレコードを更新してコミットしたとします。トランザクションT1が再度テーブルの更新時間が最新の5件のレコードを取得したら最新のレコードが先ほどと異なる結果となりました。このようにトランザクション中に別のトランザクションによる更新の影響を受けてしまう現象をUnrepeatable Readと呼びます。

  • Read Uncommitted: Dirty Readを許容しまう。Dirty Readは他のトランザクションがまだコミットしていない変更を読み取れてしまう現象を指します。これは例えばロールバックして最終的にはコミットされなかった変更でさえも読み込んでしまう可能性があります。

Isolationレベルと許容するInconsistent Read

Serializable以外はInconsistent Readを許容することでLiveness面で有利になります。正確性とLivenessのトレードオフを見極めてIsolationレベルを選択してください。

ここまでデータベーストランザクションの話をしてきましたが、アプリケーションのユーザーにとっては意味のないものです。(著書にはそう書いてありますが、意味のないというよりは物足りないくらいに考えるとよさそうです。)

例えば、オンラインバンキングアプリケーションのユーザーにとってのトランザクションとは「ログインして、口座を選んで、請求書の支払いを項目を入力して、クリックする」というような一連の動作です。これはビジネストランザクションと呼ばれます。ユーザーはこの一連の動作に対して、当然のようにACID特性を期待します。ACID特性と認識はしていないでしょうが、「最後にOKボタンをクリックしなかったら、すべての入力はなかったことになる」というようなことは少なくとも期待しているでしょう。

すべてのビジネストランザクションを単発のデータベーストランザクションに含めるのは無理があります。そのため、ビジネストランザクションを複数のデータベーストランザクションに分けなければならない場面も出てきます。すなわち、Offline Concurrency問題が残るということです。

IsolationとConsistencyについては特に気をつかう必要があります。他のビジネストランザクションから中途半端なデータの状態が見えてしまう事態は発生しやすいので、データベースのテーブルやドメインロジックを慎重に設計する必要があります。

Offline Concurrencyを管理するためのパターン

Offline Concurrencyの問題はできる限りデータベーストランザクションに任せてしまうべきです。しかし、ビジネストランザクションを扱う上でどうしても自分で管理しなければならないときがあります。

これから紹介するテクニックは必要がない限りは使わないでください。これらのテクニックはあくまで目的ではなくスタート地点です。すべての問題を解決できるわけではありません。

Optimistic Offline Lockは実装が比較的簡単でLiveness面でも有利です。ただし、コミットのタイミングで失敗する可能性があり、状況によっては失敗するタイミングが遅いことが致命的な弱点となりえます。例えばユーザーが1時間かけて入力した内容のコミットに失敗したら、システムへの信頼は確実に失われます。

Opptimistic Offline Lockの実装例です。TypeScript と PostgreSQL で実装しています。テーブルにはある海賊船の船員の情報に加えて `version` というカラムを追加しています。

// crews

 id | name  |   bounty   | version 
----+-------+------------+---------
  1 | Luffy | 1500000000 |       0 
  2 | Zoro  |  320000000 |       0

この `version` を用いることでコンフリクトを検出します。 `update()` において `id` だけでなく `version` もWHERE条件に加えることで、古い `version` を元に更新しようとするとアップデートされないようにしています。また、アップデートするときには `version` をインクリメントします。

import { client } from "./postgres_client.ts";

interface CrewsRow {
  id: number;
  name: string;
  bounty: bigint;
  version: number;
}

export class Crew {
  constructor(
    private _id: number,
    public name: string,
    public bounty: bigint,
    private version: number,
  ) {}

  get id(): number {
    return this._id;
  }

  async insert(): Promise<void> {
    await client.queryArray`
      INSERT INTO crews (id, name, bounty, version)
      VALUES (${this.id}, ${this.name}, ${this.bounty}, ${this.version})
    `;
  }

  async update(): Promise<void> {
    const result = await client.queryArray`
      UPDATE crews
      SET name=${this.name}, bounty=${this.bounty}, version=${this.version + 1}
      WHERE id=${this.id} AND version=${this.version}
    `;
    if (!result.rowCount) {
      throw new Error("Conflict Occurred");
    }
  }

  static async find(id: number): Promise<Crew> {
    const { rows: [row] } = await client.queryObject<CrewsRow>`
      SELECT id, name, bounty, version
      FROM crews
      WHERE id = ${id}
    `;
    if (!row) {
      throw new Error("Record Not Found");
    }
    return new Crew(row.id, row.name, row.bounty, row.version);
  }
}

テストも含めたすべてのコードは こちら にあります。

Optimistic Offline Lockと比べて、Pessimistic Offline Lockはより早く問題を発見します。その代わりに実装が比較的難しく、Liveness面で不利です。

Pessimistic Offline Lockの実装例です。今回は例を簡単にするためにWriteロックのみを実装しています。 `write_locks` というテーブルを新たに導入してWriteロックを実現します。

// crews

 id | name  |   bounty
----+-------+------------
  1 | Luffy | 1500000000
  2 | Zoro  |  320000000

// write_locks

  resource_id | resource_name | owner
--------------+---------------+----------
            1 | crew          | process1
            2 | crew          | process2

`update()` において「自分以外の `owner` がWriteロックを取得していなかったら」という条件を追加しています。加えて `find()` においても「自分以外の `owner` がWriteロックを取得していたら」エラーとなるようにしています。これによってより早いタイミングでコンフリクトを検出しています。

import { client } from "./postgres_client.ts";

interface CrewsRow {
  id: number;
  name: string;
  bounty: bigint;
  owner: string | null;
}

export class Crew {
  private static resourceName = "crew";

  constructor(
    private _id: number,
    public name: string,
    public bounty: bigint,
    private owner: string,
  ) {}

  get id(): number {
    return this._id;
  }

  private get resourceName() {
    return (<typeof Crew> this.constructor).resourceName;
  }

  async insert(): Promise<void> {
    await client.queryArray`
      INSERT INTO crews (id, name, bounty)
      VALUES (${this.id}, ${this.name}, ${this.bounty})
    `;
  }

  async aquireWriteLock() {
    await client.queryArray`
      INSERT INTO write_locks (resource_id, resource_name, owner)
      VALUES (${this.id}, ${this.resourceName}, ${this.owner})
    `;
  }

  async releaseWriteLock() {
    await client.queryArray`
      DELETE FROM write_locks
      WHERE resource_id = ${this.id}
        AND resource_name = ${this.resourceName}
        AND owner = ${this.owner}
    `;
  }

  async update(): Promise<void> {
    const result = await client.queryArray`
      UPDATE crews
      SET name=${this.name}, bounty=${this.bounty}
      WHERE id=${this.id} AND NOT EXISTS(
        SELECT *
        FROM write_locks
        WHERE resource_id = ${this.id}
          AND resource_name = ${this.resourceName}
          AND owner != ${this.owner}
      )
    `;
    if (!result.rowCount) {
      throw new Error("Record Locked");
    }
  }

  static async find(id: number, owner: string): Promise<Crew> {
    const { rows: [row] } = await client.queryObject<CrewsRow>`
      SELECT id, name, bounty, owner
      FROM crews
      LEFT OUTER JOIN write_locks ON resource_id = crews.id AND resource_name = ${this.resourceName}
      WHERE id = ${id}
    `;
    if (!row) {
      throw new Error("Record Not Found");
    }
    if (row.owner && row.owner != owner) {
      throw new Error("Record Locked");
    }
    return new Crew(row.id, row.name, row.bounty, owner);
  }
}

今回は `aquireWriteLock()` や `releaseWriteLock()` をクラス内に直接実装していますが、汎用的に使える別のクラスに実装するのも良いと思います。

テストも含めたすべてのコードは こちら にあります。

どちらのアプローチをとる場合でも、複数のオブジェクトのグループに対して並行処理問題を管理するCoarse-Grained Lockを用いることでかなり複雑性を軽減できます。

Coarse-Grained Lockの実装例です。今回は例を簡単にするためにOptimistic Offline Lockバージョンのみを実装しています。また `crews` テーブルに `version` を持たせるようにしていますが、 `version` を管理する別のテーブルを用意することもできます。

// crews

 id | name  |   bounty   | version 
----+-------+------------+---------
  1 | Luffy | 1500000000 |       0 
  2 | Zoro  |  320000000 |       0

// special_moves

 id |      name       |  crew_id
----+-----------------+------------
  1 | Gum-Gum Pistol  | 1
  2 | Gum-Gum Bazooka | 1
  3 | Oni Giri        | 2
  4 | Tora Gari       | 2

長いですが、 `Crews` クラスはOptimistic Offline Lockの例とほとんど同じです。ここでは `SpecialMoves` クラスに着目します。まず `findForCrew()` では単に `special_moves` テーブルからデータを取得するだけではなく、 `crews` から `version` の情報も取得します。そして `update()` において `special_moves` テーブルの更新だけではなく、 `crews` テーブルの `version` のインクリメントを試みることで、 `crews` と `special_moves` のいずれかにコンフリクトがあったかどうかを検出できるようにしています。(本当は `insert()` でもインクリメントを試みたほうが良いと思いますが、ここではサボっています。)

import { client } from "./postgres_client.ts";

interface CrewsRow {
  id: number;
  name: string;
  bounty: bigint;
  version: number;
}

export class Crew {
  private _specialMoves: SpecialMove[] = [];

  constructor(
    private _id: number,
    public name: string,
    public bounty: bigint,
    private _version: number,
  ) {}

  get id(): number {
    return this._id;
  }

  get version(): number {
    return this._version;
  }

  get specialMoves(): SpecialMove[] {
    return this._specialMoves;
  }

  async insert(): Promise<void> {
    await client.queryArray`
      INSERT INTO crews (id, name, bounty, version)
      VALUES (${this.id}, ${this.name}, ${this.bounty}, ${this.version})
    `;
  }

  async update(): Promise<void> {
    const result = await client.queryArray`
      UPDATE crews
      SET name=${this.name}, bounty=${this.bounty}, version=${this.version + 1}
      WHERE id=${this.id} AND version=${this.version}
    `;
    if (!result.rowCount) {
      throw new Error("Conflict Occurred");
    }
  }

  static async find(id: number): Promise<Crew> {
    const { rows: [row] } = await client.queryObject<CrewsRow>`
      SELECT id, name, bounty, version
      FROM crews
      WHERE id = ${id}
    `;
    if (!row) {
      throw new Error("Record Not Found");
    }
    const crew = new Crew(row.id, row.name, row.bounty, row.version);
    crew._specialMoves = await SpecialMove.findForCrew(crew.id);
    return crew;
  }
}

interface SpecialMovesRow {
  id: number;
  name: string;
  crew_id: number;
  version: number;
}

export class SpecialMove {
  constructor(
    private _id: number,
    public name: string,
    public crewId: number,
    private version: number,
  ) {}

  get id(): number {
    return this._id;
  }

  async insert() {
    await client.queryArray`
      INSERT INTO special_moves (name, crew_id)
      VALUES (${this.name}, ${this.crewId})
    `;
  }

  async update(): Promise<void> {
    const transaction = await client.createTransaction("SpecialMove.update()");
    await transaction.begin();
    try {
      await transaction.queryArray`
        UPDATE special_moves
        SET name=${this.name}
        WHERE id=${this.id}
      `;
      const result = await transaction.queryArray`
        UPDATE crews
        SET version=${this.version + 1}
        WHERE id=${this.crewId} AND version=${this.version}
      `;
      if (!result.rowCount) {
        throw new Error("Conflict Occurred");
      }
      await transaction.commit();
    } catch (e) {
      await transaction.rollback();
      throw e;
    }
  }

  static async findForCrew(crewId: number): Promise<SpecialMove[]> {
    const { rows } = await client.queryObject<SpecialMovesRow>`
      SELECT special_moves.id as id, special_moves.name, crew_id, version
      FROM special_moves
      JOIN crews ON crews.id = crew_id
      WHERE crew_id = ${crewId}
      ORDER BY special_moves.id
    `;
    const specialMoves = rows.map((row: SpecialMovesRow) =>
      new SpecialMove(row.id, row.name, row.crew_id, row.version)
    );
    return specialMoves;
  }
}

テストも含めたすべてのコードは こちら にあります。

Offline Concurrencyを管理するためのパターンの選択はUXに大きく影響を与える可能性があります。そのためより良い並行処理のデザインには洗練されたドメイン知識が必須となります。

アプリケーションサーバの並行処理

Offline Concurrencyとは別にアプリケーションサーバーのプロセスの並行処理について考えます。アプリケーションサーバーでは複数のリクエストをどのように並行で扱うかという問題があります。これはデータベーストランザクションに任せることはできません。

まず気を付けたいこととして、マルチスレッドプログラミングは難しいということです。ロックと同期ブロックによる制御は容易にバグを生みやすいので、必要ない限りは使わないことをオススメします。

単純な解決策としては、リクエストごとにプールされたプロセスを使う方法です。こうすることでリクエストごとにメモリーが分離されて、データを共有しないので安全です。ただし、リクエストが終了したときにリソースがちゃんと解放されていることを保証しないといけません。

さらにスループットを上げたい場合は、リクエストごとにスレッドを使います。プロセスと比べてリソース効率は良くなります。しかし、メモリーが分離されないため、バグを生み出しやすいです。

基本的にはリクエストごとにプロセスを使う方法が安全です。リソース効率はよくありませんが、サーバーをスケールアウトすることで問題に対処できます。特にマルチスレッドに対する経験の浅いチームがバグに悩まされるコストと比べれば、ハードウェアのコストのほうがいくぶんマシです。

リクエストごとにスレッドを使う場合、分離された領域を意識することが重要です。例えば、Immutableでないオブジェクトはリクエストごとに作成することでスレッド間で共有されることがないようにします。

クラス変数やグローバル変数はなるべく使わないようにします。どうしても使いたい場合、スレッドベースのRegistryを用いるか、Mutexなどを用いた同期処理を行いましょう。例えば、データベースコネクションを開く処理などは重い処理なのでプールして、プールから取得する処理は同期すると良いです。

スレッドベースのRegistryの実装例を紹介します。 Ruby で実装しています。ポイントは `Thread.current[:hoge]` という箇所です。Rubyではこれがスレッドごとに分離したいデータを置ける空間になっています。

# frozen_string_literal: true

class SomeClass
  def initialize
    puts "an instance was created in #{Thread.current}!"
  end
end

class Registry
  def self.instance
    Thread.current[:instance] ||= SomeClass.new
    puts "an instance was called in #{Thread.current}!"
    Thread.current[:instance]
  end
end

(1..3).map do
  Thread.start do
    Registry.instance
    Registry.instance
  end
end.each(&:join

実行してみると以下のような感じで出力されることが確認できます。`created` という出力はスレッドの数である3つ分だけ出力されています。 `called` は計6回出力されているので、スレッドごとにシングルトンなインスタンスを作成できていることがわかります。

$ ruby main.rb
an instance was created in #<Thread:0x00007fa0893c58a8 main.rb:18 run>!
an instance was called in #<Thread:0x00007fa0893c58a8 main.rb:18 run>!
an instance was called in #<Thread:0x00007fa0893c58a8 main.rb:18 run>!
an instance was created in #<Thread:0x00007fa0893c5588 main.rb:18 run>!
an instance was called in #<Thread:0x00007fa0893c5588 main.rb:18 run>!
an instance was called in #<Thread:0x00007fa0893c5588 main.rb:18 run>!
an instance was created in #<Thread:0x00007fa0893c5768 main.rb:18 run>!
an instance was called in #<Thread:0x00007fa0893c5768 main.rb:18 run>!
an instance was called in #<Thread:0x00007fa0893c5768 main.rb:18 run>!

実行するための環境も含めて確認したい場合は こちら をご覧ください。

最後にMutexの使用例も紹介します。Mutexを使った場合と使わない場合でどのように結果が変わるかをわかるようにしています。

# frozen_string_literal: true

class Singleton
  @mutex = Mutex.new
  @count = 0

  def self.puts_and_increment_without_mutex
    puts(@count)
    @count += 1
  end

  def self.puts_and_increment_with_mutex
    @mutex.synchronize do
      puts(@count)
      @count += 1
    end
  end
end

puts('--- without Mutex ---')

(1..5).map do
  Thread.start do
    Singleton.puts_and_increment_without_mutex
  end
end.each(&:join)

puts('--- with Mutex ---')

(1..5).map do
  Thread.start do
    Singleton.puts_and_increment_with_mutex
  end
end.each(&:join)

実行してみると、以下のような感じで結果が出力されます。Mutexを使わない場合は、それぞれのスレッドの処理が一斉に実行されて、 `@count += 1` が実行される前か最初の `@count += 1` が実行された直後くらいのタイミングですべてのスレッドがカウントの表示を終えているため、0か1しか表示されていません。後続の処理ではカウントは5から始まり、各スレッドは前のスレッドがロックを解放するまで待っているので、結果は1ずつインクリメントされていることがわかります。

$ ruby main.rb
--- without Mutex ---
0
1
0
0
1
--- with Mutex ---
5
6
7
8
9

実行するための環境も含めて確認したい場合は こちら をご覧ください。

さいごに

今回はChapter 5. Concurrencyについての紹介をしました。

説明が不足していたり、わかりにくいようなところがありましたら、お気軽にご連絡いただければと思います。

次回はChapter 6. Session Stateを紹介します。どうぞよろしくお願いします。

この記事が気に入ったらサポートをしてみませんか?