Python Threading Module[並列処理]についての解説

1.概要

PythonのThreadingモジュールは、複数のスレッドを使用してPythonプログラムを並列実行するためのツールです。Threadingモジュールを使用すると、複数のタスクを同時に実行したり、長時間実行されるタスクをバックグラウンドで実行したりできます。PythonのGIL(Global Interpreter Lock)により、スレッドはCPU時間を共有することができるため、Pythonのマルチスレッドプログラミングはマルチプロセスプログラミングよりも簡単で効率的です。

2.基本的な使い方

まず、threadingモジュールをインポートします。

import threading

次に、スレッドで実行する関数を定義します。

def print_numbers():
    for i in range(10):
        print(i)

def print_letters():
    for letter in 'abcdefghij':
        print(letter)

これで、threading.Threadクラスを使って、それぞれの関数に対してスレッドを作成します。

# スレッドを作成
thread_1 = threading.Thread(target=print_numbers)
thread_2 = threading.Thread(target=print_letters)

最後に、start()メソッドを呼び出してスレッドを開始し、join()メソッドを使ってスレッドが終了するのを待ちます。

# スレッドを開始
thread_1.start()
thread_2.start()

# スレッドが終了するのを待つ
thread_1.join()
thread_2.join()

print("All threads finished.")

この例では、print_numbers関数とprint_letters関数が並列に実行されます。結果として、数字と文字が交互に表示されることがありますが、スレッドの実行順序は保証されません。
threadingモジュールを使ったマルチスレッドプログラミングでは、リソースへのアクセス競合などの問題が発生することがあります。これを回避するために、スレッドセーフなデータ構造や同期プリミティブ(例えば、Lock, Semaphore, Conditionなど)を使用することが重要です。

3.スレッドを1つに限定する

threadingモジュールのLockオブジェクトを使って、スレッド間でリソースへのアクセスを制御する例を紹介します。
Lockオブジェクトは、一度に1つのスレッドだけがリソースにアクセスできるようにするための同期プリミティブです。

import threading
import time

# 共有リソースにアクセスするためのロック
resource_lock = threading.Lock()

# 共有リソース
counter = 0

# スレッドで実行する関数
def increment_counter():
    global counter
    with resource_lock:
        temp = counter
        time.sleep(0.001)  # このスレッドを一時停止して、他のスレッドが実行される機会を与える
        counter = temp + 1
        print(f"Counter: {counter}")

# スレッドを作成
threads = []
for _ in range(10):
    thread = threading.Thread(target=increment_counter)
    threads.append(thread)

# スレッドを開始
for thread in threads:
    thread.start()

# スレッドが終了するのを待つ
for thread in threads:
    thread.join()

print("All threads finished.")

Counter: 1
Counter: 2
Counter: 3
Counter: 4
Counter: 5
Counter: 6
Counter: 7
Counter: 8
Counter: 9
Counter: 10
All threads finished.

この例では、increment_counter関数が10個のスレッドで並行して実行されます。resource_lockオブジェクトを使って、共有リソースであるcounter変数へのアクセスを制御しています。
with resource_lock:ブロック内で、1つのスレッドだけがcounterにアクセスできるようになります。これにより、アクセス競合を防止し、正しい結果が得られるようになります。
この例では、Lockオブジェクトのacquire()メソッドとrelease()メソッドを明示的に呼び出す代わりに、withステートメントを使っています。withステートメントを使うことで、acquire()とrelease()の呼び出しが自動的に行われ、コードが読みやすくなります。
threadingモジュールには、他にもSemaphore、Event、Conditionなどの同期プリミティブが提供されており、より複雑なスレッド間の同期や制御を行うことができます。また、Threadクラスを継承して独自のスレッドクラスを作成することもできます。これにより、より柔軟なマルチスレッドプログラミングが可能になります。

4.スレッドの最大数を制限する

threadingモジュールのSemaphoreオブジェクトを使って、一度に実行できるスレッドの数を制限する例を紹介します。Semaphoreオブジェクトは、同時に実行できるスレッドの最大数を制限するための同期プリミティブです。

import threading
import time

# セマフォオブジェクトを作成(同時に実行できるスレッドの数を3に制限)
semaphore = threading.Semaphore(3)

# スレッドで実行する関数
def limited_task(task_id):
    with semaphore:
        print(f"Task {task_id} started.")
        time.sleep(2)  # タスクの実行時間をシミュレート
        print(f"Task {task_id} finished.")

# スレッドを作成
threads = []
for i in range(10):
    thread = threading.Thread(target=limited_task, args=(i,))
    threads.append(thread)

# スレッドを開始
for thread in threads:
    thread.start()

# スレッドが終了するのを待つ
for thread in threads:
    thread.join()

print("All threads finished.")

Counter: 1
Counter: 2
Counter: 3
Counter: 4
Counter: 5
Counter: 6
Counter: 7
Counter: 8
Counter: 9
Counter: 10
Task 4 started.
Task 3 started.Task 5 started.
Task 5 finished.Task 4 finished.Task 3 finished.

Task 6 started.Task 7 started.

Task 8 started.
Task 6 finished.Task 8 finished.Task 7 finished.

Task 9 started.
Task 9 finished.
All threads finished.

この例では、limited_task関数が10個のスレッドで並行して実行されますが、Semaphoreオブジェクトによって、同時に実行できるスレッドの数が3に制限されます。with semaphore:ブロック内で、Semaphoreオブジェクトが自動的にacquire()とrelease()メソッドを呼び出し、スレッドが制限内で実行されるようになります。
Semaphoreは、一度に実行できるタスクの数を制限することで、リソースの過剰使用を防ぎ、アプリケーションのパフォーマンスや安定性を向上させるのに役立ちます。例えば、Webサーバーで一度に処理できるリクエスト数を制限することができます。
threadingモジュールでは、他にもBarrierやTimerなどの機能が提供されています。Barrierは、複数のスレッドが特定のポイントで同期するための同期プリミティブであり、Timerは一定時間後に関数を実行するためのスレッドです。
マルチスレッドプログラミングは、パフォーマンスを向上させるために役立ちますが、スレッド間の同期やリソース管理に注意が必要です。threadingモジュールを使いこなすことで、効果的な並行処理を実現することができます。

5.スレッド間で状態を同期させる

threadingモジュールのEventオブジェクトを使って、スレッド間でイベントを通知する方法を紹介します。Eventオブジェクトは、スレッド間で状態を通知するための同期プリミティブです。

import threading
import time

# イベントオブジェクトを作成
event = threading.Event()

# スレッドで実行する関数
def wait_for_event():
    print("Waiting for event...")
    event.wait()
    print("Event triggered!")

def trigger_event():
    time.sleep(3)  # イベントをトリガーするまでの時間をシミュレート
    print("Triggering event...")
    event.set()

# スレッドを作成
waiting_thread = threading.Thread(target=wait_for_event)
triggering_thread = threading.Thread(target=trigger_event)

# スレッドを開始
waiting_thread.start()
triggering_thread.start()

# スレッドが終了するのを待つ
waiting_thread.join()
triggering_thread.join()

print("All threads finished.")

Waiting for event...
Triggering event...
Event triggered!
All threads finished.

この例では、wait_for_event関数が実行されているスレッドが、trigger_event関数が実行されているスレッドからイベントを待ち受けます。Eventオブジェクトのwait()メソッドは、イベントがセットされるまでスレッドをブロックし、set()メソッドが呼び出されると、待機しているすべてのスレッドが解放されます。
Eventオブジェクトは、スレッド間で一度きりのイベントを通知するのに役立ちます。また、イベントをリセットして再利用することもできます。その場合、Eventオブジェクトのclear()メソッドを呼び出して、イベントの状態をリセットします。
threadingモジュールには、Conditionオブジェクトもあります。Conditionオブジェクトは、Eventオブジェクトに似ていますが、より一般的な条件変数を表現できます。Conditionオブジェクトは、wait()メソッドでスレッドをブロックし、特定の条件が満たされたときに、notify()またはnotify_all()メソッドで待機しているスレッドを解放します。
threadingモジュールを使ってマルチスレッドプログラミングを行う際には、同期プリミティブやデータ構造を適切に使用して、スレッド間のコミュニケーションやリソース管理を正確に行うことが重要です。

6.スレッド間での状態を条件付きで同期する。

threadingモジュールのConditionオブジェクトを使った例を紹介します。Conditionオブジェクトは、スレッド間の条件付き同期を行うための同期プリミティブです。
以下の例では、プロデューサースレッドがリストにアイテムを追加し、コンシューマースレッドがリストからアイテムを取り出すシナリオを扱います。

import threading
import time
import random

# 共有リソース
items = []
# リストへのアクセスを制御する Condition オブジェクト
list_condition = threading.Condition()

# プロデューサースレッドで実行する関数
def producer():
    for _ in range(10):
        time.sleep(random.uniform(0.5, 1.0))
        with list_condition:
            item = random.randint(1, 100)
            items.append(item)
            print(f"Produced: {item}")
            list_condition.notify()

# コンシューマースレッドで実行する関数
def consumer():
    while True:
        with list_condition:
            while not items:
                list_condition.wait()
            item = items.pop(0)
            print(f"Consumed: {item}")
            if not items:
                break

# スレッドを作成
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# スレッドを開始
producer_thread.start()
consumer_thread.start()

# スレッドが終了するのを待つ
producer_thread.join()
consumer_thread.join()

print("All threads finished.")

Produced: 55
Consumed: 55
Produced: 12
Produced: 90
Produced: 54
Produced: 20
Produced: 80
Produced: 18
Produced: 98
Produced: 14
Produced: 25
All threads finished.

この例では、プロデューサースレッドがアイテムを生成し、コンシューマースレッドがアイテムを消費します。list_conditionオブジェクトを使って、リストへのアクセスを制御しています。with list_condition:ブロック内で、プロデューサースレッドはアイテムをリストに追加し、notify()メソッドでコンシューマースレッドに通知します。コンシューマースレッドは、wait()メソッドでアイテムが利用可能になるまで待機し、アイテムが利用可能になると、リストからアイテムを取り出して処理します。
Conditionオブジェクトは、特定の条件が満たされるまでスレッドをブロックし、条件が満たされたら待機しているスレッドを解放することができます。これにより、複数のスレッドが共有リソースに対して安全にアクセスし、互いに通知することができます。

7.最後に

threadingモジュールを使ったマルチスレッドプログラミングでは、適切な同期プリミティブやデータ構造を使用して、スレッド間のコミュニケーションやリソース管理を正確に行うことが重要です。また、デッドロックや競合状態などの問題を回避するために、スレッド間の同期やリソース管理に注意が必要です。
threadingモジュールを使ったマルチスレッドプログラミングでは、以下のベストプラクティスを実践することがおすすめです。

  1. 最小限の共有リソース: スレッド間で共有するリソースを最小限に抑え、ローカル変数やスレッドローカルストレージを使用して、スレッド固有のデータを保持します。

  2. データの不変性: 可能な限り不変なデータ構造を使用して、スレッド間でデータを共有することで、同期の必要性を減らすことができます。

  3. ロックの範囲を最小限に: ロックを取得している間は、できるだけ少ない操作を行い、他のスレッドが待たされる時間を短縮します。

  4. ロックの順序: 複数のロックを取得する必要がある場合、スレッド間で一貫した順序でロックを取得することで、デッドロックを防ぐことができます。

  5. タイムアウト付きの待機: wait()やacquire()メソッドにタイムアウトを設定することで、デッドロックや他の問題が発生した場合でも、スレッドが適切に解放されるようになります。

マルチスレッドプログラミングは、パフォーマンスを向上させるために役立ちますが、同期やリソース管理に関する問題があるため、慎重に設計する必要があります。threadingモジュールを使用して効果的な並行処理を実現するためには、同期プリミティブやデータ構造を適切に使用し、上記のベストプラクティスを実践することが重要です。

この記事が気に入ったらサポートをしてみませんか?