見出し画像

Python基礎16:並列タスク(concurrent.futures)


1.概要

 Pythonの標準ライブラリである”concurrent.futures"モジュールを使って並列化を行うことができます。並列化はコンピューティングタスクを同時に処理することで、プログラムの実行時間を大幅に短縮することができます。

2.基礎知識

2-1.並行処理と並列処理の違い

 まず初めに並行処理(Concurrency)と並列処理(Parallelism)の概念を理解することが重要です。下記の通り並行処理は見かけの同時処理、並列処理は実質的な同時処理です。

  • 並行性(Concurrency):複数のタスクが同時に起こるように見える状態を指します。一つのプロセッサ交互にタスクを切り替えることで、タスクが同時に進行しているように見えます。

  • 並列性(Parallelism):複数のタスクが文字通り同時に実行される状態を指します。これには複数のプロセッサ(あるいはプロセッサのコア)が必要で、それぞれが異なるタスクを同時に処理します。

 Pythonの concurrent.futures モジュールはこれらの概念を実装するためのツールを提供しています。

2-2.concurrent.futuresの適用範囲

 concurrent.futuresを使用する主なシナリオは、処理が重いタスクを並行に実行する必要がある場合です。このモジュールを使用することで各タスクが独立して実行され、全体の実行時間を短縮することができます。

 一方でconcurrent.futuresが適切でない条件も存在します。事例は下記の通りです。

  • プログラムが単純でタスクがすぐに終了:スレッドやプロセスの生成と管理に関連するオーバーヘッドがパフォーマンス向上のメリットを上回る可能性がある

  • データを共有する必要がある: スレッド間またはプロセス間でデータを共有する必要がある場合、並行性の管理が複雑になる(データの整合性を保つためにロックなどの同期メカニズムを使用する必要がある)

  • 順序依存のタスク:一つのタスクが別のタスクの結果に依存する

  • 同時アクセスによるエラー:複数のタスクが同じリソース(ファイル、DBなど)に同時にアクセスすると問題が発生する可能性のあるもの

2-3.他の並列処理ライブラリ(低水準API)

 Pythonでは"concurrent.futures"以外に"multiprocessing"や"threading"といったモジュールを使用してマルチスレッドやマルチプロセスを実装することができます。しかしこれらはconcurrent.futuresに比べて低レベルなAPIであり、より細かい制御が可能ですが複雑さも増します。

2-4.Pythonのmap()関数

 並列処理で使用するmap()メソッドを理解するためにPythonの組み込み関数map()の理解が必要です。Pythonのmap()関数は与えられた関数をイテラブル(リストやタプルなど)の全ての要素に適用し、その結果を返します。以下に簡単な例を示します。

[IN]
def func(n):
    return n * n

numbers = [1, 2, 3, 4, 5]
result = map(func, numbers)
squares = list(result)
print(result)
print(squares)  
[OUT]
<map object at 0x0000016611A26100>
[1, 4, 9, 16, 25]

 この例では、map()関数はfunc関数をnumbersリスト(イテラブル)の全ての要素に適用しており、イテラブルなオブジェクトを返します。これをリストに変換するためにlist()関数を使用し、出力は[1, 4, 9, 16, 25]となります。

【コラム:無名関数lambdaを使用】
 なるべく短くコードを記載したい場合はlambda関数が便利となります。

[IN]
list(map(lambda n: n * n, numbers))

[OUT]
[1, 4, 9, 16, 25]

2-5.基礎用語

 関連する用語は下記の通りです。

  • プロセスプロセスとはプログラムの実行中の状態を指します。実行中の状態とは、プログラムOSによってメモリ上の領域を割り当てられ、そこでCPUとデータのやり取りを行い演算処理されている状態です。

  • スレッド:コンピュータ上では複数のプログラムが実行されており、これらのプロセスは全てCPUによって並列で処理されています。並列で処理できるとはいえ、CPU上ではプロセス内の実行する部分をいくつかに分けており、一つの実行単位をスレッドと呼びます。

  • スレッドプール:スレッドプールはあらかじめ作成されたスレッドのセットで、それぞれのスレッドはプールから取り出されてタスクを実行し、タスクが終わるとプールに戻ります。これはスレッドの作成と破棄にはコストがかかるため、頻繁にスレッドを作成・破棄するよりも効率的です。

  • マルチスレッド:マルチスレッドとは、1つのプロセス内で複数の実行フロー(スレッド)を作り出すことで同時に複数のタスクを処理する方法を指します。これにより、1つのプログラム内で多くの作業を同時に進行させることが可能になります。

  • 並列タスク:複数のタスクが物理的に同時に実行されることを指します。これはマルチプロセッサーやマルチコアのCPUで可能で、並列化されたタスクはそれぞれ異なるプロセッサーやコア上で同時に実行されます。

  • マルチプロセス:マルチプロセスとは、システムが複数のプロセスを同時に実行する方式を指します。各プロセスは独自のメモリ空間を持ち、他のプロセスとは独立して動作するため物理的な並列実行を可能にします。

  • 非同期実行:非同期実行とは複数のタスクを同時に開始し、各タスクが完了するのを待たずに次のタスクを開始する実行方式であり、非同期プログラミングの一部です。

  • I/O:Input/Outputの略で「入出力」を意味します。ハードディスクなどの記憶媒体に対するデータの読み書きについては「ディスクI/O」、ネットワークに対する入出力については「ネットワークI/O」と、I/O負荷の発生場所によって区別して表記することがあります。

  • 非同期IO:非同期IOとは、入出力の操作が終わるのを待つことなく他の作業を進めることができるようにする手法を指します。非同期IOを利用することでプログラムはブロック(待機)することなく、他のタスクを進めることができます。

  • I/Oバウンド:I/Oバウンドはシステムの入出力によりそのパフォーマンスが制限されている状況を示す用語です。I/Oバウンドを解決する方法は、高速なドライブの使用やシステムに追加するドライブの台数を増加するなどあります。

  • CPUバウンド:CPUバウンドとはプログラムの性能がCPUの処理速度によって制限される状態を指します。例えば大量の数値計算や複雑なデータの解析を行うプログラムではCPU処理がパフォーマンスを決定します。

3.concurrent.futuresモジュール

 concurrent.futuresモジュールはPythonの並列処理と並行処理を管理するためのモジュールであり、Executorというオブジェクトを通じてタスクの実行を管理します。

3-1.モジュールの概要

 concurrent.futuresThreadPoolExecutorProcessPoolExecutorという二つのクラスを提供しています。これらはExecutorという抽象クラスを継承しています。

【主なクラス】

  • ThreadPoolExecutor:このクラスは同じプロセス内の異なるスレッドでタスクを実行します。I/O待ちが多いタスクに適しています。

  • ProcessPoolExecutor:このクラスは各タスクを別のプロセスで実行します。CPU密集型のタスクに適しています。

 これらのクラスでは主に以下のメソッドがよく使用されます。

【主なメソッド】

  • submit(fn, *args, **kwargs):非同期に関数 fn を実行し、その結果を取得するためのFutureオブジェクトを返します。

  • map(func, *iterables, timeout=None, chunksize=1):関数 func を iterables の各要素に適用します。このメソッドはジェネレータを返すため、関数の結果はイテレーションごとに返されます。

3-2.Executor オブジェクトAPI

 Executorクラスはconcurrent.futuresモジュールの抽象クラスであり、ThreadPoolExecutorとProcessPoolExecutorがこのクラスを継承しています。Executorクラス自体は通常直接使用せず、代わりにそのサブクラスを使用します。

 Executorクラスは以下の主なメソッドを提供しています。

【submit()】
 
関数 fn を非同期に実行し、その結果を取得するためのFutureオブジェクトを返します。

[API]
submit(fn, /, *args, **kwargs)

【map()】
 
関数 func を iterables の各要素に適用して結果のジェネレータを返します。

[API]
map(func, *iterables, timeout=None, chunksize=1)

【shutdown()】
 
Executorをシャットダウンします。wait引数がTrueの場合、すべての未完了のFutureが完了するまでブロックします。

[API]
shutdown(wait=True, *, cancel_futures=False)

4.ThreadPoolExecutorクラス

 ThreadPoolExecutorクラスはスレッドを使って並行処理を行うためのクラスです。このクラスを使うと複数のスレッドを簡単に管理できます。

[API]
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', 
                                            initializer=None, initargs=())

 ThreadPoolExecutorはExecutorのサブクラスの一つであり、スレッドのプールを使用して非同期に呼び出しを行います。このクラスは主にI/O待ちが多いタスクに適しています。

 ThreadPoolExecutorは以下のようにインスタンス化でき、"max_workers"パラメータでスレッドプールの最大数を指定しています。

[IN]
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=5)

4-1.submit()

 submitメソッドは指定された関数を非同期に実行し、Futureオブジェクトを返します。このFutureオブジェクトを使用して関数の結果を取得することができます。

[IN]
from concurrent.futures import ThreadPoolExecutor

def func(n):
    return n * n

with ThreadPoolExecutor(max_workers=5) as executor:
    future = executor.submit(func, 2)
    print(future)
    print(future.result())
[OUT]
<Future at 0x16611a26280 state=finished returned int>
4

4-2.map()

 mapメソッドは指定された関数をイテラブルの各要素に適用し、結果を生成します。このメソッドはジェネレータを返すため、出力の取得はイテレーションと同じ取扱いになります。

[IN]
from concurrent.futures import ThreadPoolExecutor

def func(n):
    return n * n

data = [1, 2, 3, 4, 5]

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(func, data)

print(results)
print(list(results))
[OUT]
<generator object Executor.map.<locals>.result_iterator at 0x00000166344DEAC0>
[1, 4, 9, 16, 25]

【関数に複数の引数を指定】
 mapメソッドはmap(<関数>, <引数1>, <引数2>・・<引数n>)のように関数に合わせて引数を複数渡すことが出来ます。
 下記に複数の引数を渡したコードを記載しました。

[IN]
def func(a, b):
    return f"a: {a}, b: {b}"


with ThreadPoolExecutor() as executor:
    rets = executor.map(func, range(10), "ABCDEFGHIJK")

print([x for x in rets])
[OUT]
['a: 0, b: A', 'a: 1, b: B', 'a: 2, b: C', 'a: 3, b: D', 'a: 4, b: E', 'a: 5, 
b: F', 'a: 6, b: G', 'a: 7, b: H', 'a: 8, b: I', 'a: 9, b: J']

【配列を渡してみる】
 mapに渡す引数を配列にして関数内でループさせることで複雑な処理も可能になります。

[IN]
def print_data(data1, data2, data3):
    total = []
    for d1, d2, d3 in zip(data1, data2, data3):
        print(f'd1: {d1}, d2: {d2}, d3: {d3}')
        total.append(d1 + d2 + d3)
    return total
    
x = np.arange(0, 10, 1).reshape(-1, 2)
y = np.arange(10, 20, 1).reshape(-1, 2)
z = np.arange(20, 30, 1).reshape(-1, 2)
display(x)
display(y)
display(z)
print(f'x.shape: {x.shape}, y.shape: {y.shape}, z.shape: {z.shape}')

with ThreadPoolExecutor() as executor:
    
    results = executor.map(print_data, x, y, z)
    
print(results)
print(list(results))
[OUT]
rray([[0, 1],
       [2, 3],
       [4, 5],
       [6, 7],
       [8, 9]])

array([[10, 11],
       [12, 13],
       [14, 15],
       [16, 17],
       [18, 19]])

array([[20, 21],
       [22, 23],
       [24, 25],
       [26, 27],
       [28, 29]])

x.shape: (5, 2), y.shape: (5, 2), z.shape: (5, 2)


d1: 0, d2: 10, d3: 20
d1: 1, d2: 11, d3: 21
d1: 2, d2: 12, d3: 22
d1: 3, d2: 13, d3: 23
d1: 4, d2: 14, d3: 24
d1: 5, d2: 15, d3: 25
d1: 6, d2: 16, d3: 26d1: 8, d2: 18, d3: 28
d1: 9, d2: 19, d3: 29
d1: 7, d2: 17, d3: 27

<generator object Executor.map.<locals>.result_iterator at 0x00000166345370B0>
[[30, 33], [36, 39], [42, 45], [48, 51], [54, 57]]

5.ProcessPoolExecutorクラス

 ProcessPoolExecutorもExecutorのサブクラスであり、プロセスプールを使って非同期呼び出しを実施します。このクラスは主にCPU密集型のタスクに適しています。

[API]
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, 
                                             initializer=None, initargs=(), 
                                             max_tasks_per_child=None)

 ProcessPoolExecutorは以下のようにインスタンス化され”max_workers"でプロセスプールの最大数を指定しています。

[IN]
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=5)

5-1.submit()

 submitメソッドはThreadPoolExecutorと同様に指定された関数を非同期に実行し、Futureオブジェクトを返します。ただしProcessPoolExecutorを使用すると各タスクは別のプロセスで実行されます。

 それでは簡単な実装をします。まず関数を実装したモジュールを作成して、importした関数をexecutor.submit()に渡します。

[func.py]
def func(n):
    return n * n
[IN]
from concurrent.futures import ProcessPoolExecutor
from func import func

with ProcessPoolExecutor(max_workers=2) as executor:
    future = executor.submit(func, 2)

print(future)
print(future.result())
[OUT]
<Future at 0x16634385b20 state=finished returned int>
4

【コラム:関数の呼び出し方によるエラー】
 並列処理の影響で下記エラーが発生しました。理解出来たら追記します。

[IN]
from concurrent.futures import ProcessPoolExecutor

def func(n):
    return n * n

with ProcessPoolExecutor(max_workers=2) as executor:
    future = executor.submit(func, 2)

print(future.result())
[OUT]
BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

5-2.map()

 ProcessPoolExecutorクラスのmap()メソッドは、ThreadPoolExecutorクラスのmap()と同様に機能します。つまり指定した関数をイテラブルの各要素に適用して結果を生成します。
 このメソッドもジェネレータを返すため出力の取得はイテレーションと同じ取扱いとなります。ただし各タスクは別々のプロセスで実行されることに注意してください。

[IN]
from concurrent.futures import ProcessPoolExecutor
from func import func  

data = [1, 2, 3, 4, 5]

with ProcessPoolExecutor(max_workers=5) as executor:
    results = executor.map(func, data)

print(results)
print(list(results))
[OUT]
<generator object _chain_from_iterable_of_lists at 0x00000166344F9740>
[1, 4, 9, 16, 25]


参考記事

あとがき

 先出し。
ProcessPoolExecutor()使う時にコード内で関数定義しているのにエラーが出る理由は理解出来たら追記。


この記事が気に入ったらサポートをしてみませんか?