見出し画像

Python入門 (14) - マルチプロセス

Pythonでのマルチプロセスの実行手順をまとめました。

・Python 3.7

前回

1. プロセスとスレッド

プロセス」は固有メモリの処理単位で、マルチコアのCPUの各コアに対してプロセスを割り当てることができます。

スレッド」はプロセス内の処理単位で、同じプロセス内のスレッドはメモリを共有します。

画像1

2. Pythonのプロセスの実行

Pytyonのプロセスの実行手順は、次のとおりです。

(1) プロセスで実行する関数の準備。
(2) 関数を実行するプロセスの準備。
(3) プロセスの開始。

使用例は、次のとおりです。

import multiprocessing
import time

# プロセスで実行する関数の準備
def worker1():
    print('start worker1')
    time.sleep(5)
    print('end worker1')

# プロセスで実行する関数の準備
def worker2(x, y):
    print('start worker2')
    time.sleep(5)
    print('end worker2', x, y)

if __name__ == '__main__':
    # 関数を実行するプロセスの準備
    p1 = multiprocessing.Process(name="p1", target=worker1)
    p2 = multiprocessing.Process(name="p2", target=worker2, args=(10, 20))

    # プロセスの開始
    p1.start()
    p2.start()

結果は、次のとおりです。

start worker1
start worker2

〜 5秒後 〜

end worker1
end worker2 10 20

◎ multiprocessing.Process
Pythonのプロセスは、multiprocessing.Processで生成します。
コンストラクタは、次のとおりです。

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
コンストラクタ。
・group : Noneのみ。互換性用。
・target : run()で呼び出すオブジェクト。
・name : プロセス名。
・args : targetを呼び出す時の引数タプル。
・kwargs : target を呼び出す時の引数辞書。
・daemon : デーモンスレッドかどうか。

メソッドとプロパティは、次のとおりです。

・start() : スレッドの処理を開始。
・run() : スレッドの処理。
・join(timeout=None) : スレッドが終了するまで待機。
・timeout : タイムアウト秒。
・name : スレッド名。
・ident : スレッドID。
・is_alive() : スレッドが処理中かどうか。
・daemon : デーモンスレッドかどうか。

・pid : プロセスID
・exitcode : 子プロセスの終了コード
・authkey : プロセスの認証キー
・sentinel : プロセス終了時にreadyとなるシステムオブジェクトの数値ハンドル。
・terminate() : プロセス終了。

3. Pythonのプロセス間通信

キューによるPythonのプロセス間通信の手順は、次のとおりです。

(1) プロセスで実行する関数の準備。
(2) キューの準備。
(3) 関数を実行するプロセスの準備し、キューを引数として渡す。

(4) キューを介してプロセス間通信を行う。

使用例は、次のとおりです。

import multiprocessing
import time

# プロセスで実行する関数の準備
def worker1(q):
    print('start worker1')
    q.put(10) # キューに要素を追加
    time.sleep(5) # 5秒スリープ
    q.put(20) # キューに要素を追加
    print('end worker1')

# プロセスで実行する関数の準備
def worker2(q):
    print('start worker2')
    print(q.get()) # キューから要素を取得
    print(q.get()) # キューから要素を取得
    print('end worker2')

if __name__ == '__main__':
    # キューの準備
    q = multiprocessing.Queue()

    # 関数を実行するプロセスの準備
    p1 = multiprocessing.Process(target=worker1, args=(q,))
    p2 = multiprocessing.Process(target=worker2, args=(q,))

    # プロセスの開始
    p1.start()
    p2.start()

    # プロセス終了まで待つ
    p1.join()    
    p2.join()    

◎ multiprocessing.Queue
Pythonのプロセス間通信のキューは、multiprocessing.Queueで生成します。
コンストラクタは、次のとおりです。

class multiprocessing.Queue(maxsize=0)
FIFOキューのコンストラクタ。
・maxsize : 最大サイズ。

class multiprocessing.LifoQueue(maxsize=0)
LIFOキューのコンストラクタ。
・maxsize : 最大サイズ。

class multiprocessing.PriorityQueue(maxsize=0)
優先順位付きキューのコンストラクタ。
・maxsize : 最大サイズ。

メソッドは、次のとおりです。

・qsize() : キューのサイズ。
・empty() : キューが空かどうか。
・full() : キューが一杯かどうか。
・put(item, block=True, timeout=None) : キューへの要素の追加。block=Trueで一杯の場合は利用可能になるまでブロック。一杯でタイムアウト時はFullを返す。
・put_nowait(item) : put(item, False) と同じ。
・get(block=True, timeout=None) : キューの要素の取得。block=Trueで空の場合は利用可能になるまでブロック。空でタイムアウト時はEmptyを返す。
・get_nowait() : get(False)と同じ。
・task_done() : 過去にキューに入れられたタスクが完了した事を示す。
・join() : キューの全要素が取り出されて処理されるまでブロック。

4. 共有メモリ

共有メモリを使うことで、どのプロセスからでもアクセスできるデータを定義することができます。以下の2つのデータ型を利用できます。

・Value
・Array

使用例は、次のとおりです。

import multiprocessing
import time

# プロセスで実行する関数の準備
def worker1(count):
    print('start worker1')
    for i in range(10):
        time.sleep(1) # 1秒スリープ
        count.value += 1
    print('end worker1', count.value)

# プロセスで実行する関数の準備
def worker2(count):
    print('start worker2')
    for i in range(10):
        time.sleep(1) # 1秒スリープ
        print(count.value)
    print('end worker2', count.value)

if __name__ == '__main__':
    # 共有メモリの準備
    count = multiprocessing.Value('i', 0) # Value型の共有オブジェクト

    # 関数を実行するプロセスの準備
    p1 = multiprocessing.Process(target=worker1, args=(count,))
    p2 = multiprocessing.Process(target=worker2, args=(count,))

    # プロセスの開始
    p1.start()
    p2.start()

    # プロセス終了まで待つ
    p1.join()    
    p2.join()    

ValueとArrayの第1引数には型を指定します。int型は'i'、double型は'd'のように1文字で指定します。Valueの第2引数には初期値、Arrayの第2引数にはサイズを指定します。

結果は、次のとおりです。

start worker1
start worker2
0
2
3
3
4
6
7
8
9
9
end worker1 10
end worker2 10

5. Manager

Manager」を使うことで、どのプロセスからでもアクセスできるPythonオブジェクトを定義することができます。以下の3つのデータ型を利用できます。

・list
・dict
・Namespace
・Lock
・RLock
・Semaphore
・BoundedSemaphore
・Condition
・Event
・Barrier
・Queue
・Value
・Array

共有メモリより多くの型を利用できますが、共有メモリよりも動作が遅くなります。

使用例は、次のとおりです。

import multiprocessing
import time

# プロセスで実行する関数の準備
def worker1(dict):
    print('start worker1')
    for i in range(10):
        time.sleep(1) # 1秒スリープ
        dict['count'] += 1
    print('end worker1', dict['count'])

# プロセスで実行する関数の準備
def worker2(dict):
    print('start worker2')
    for i in range(10):
        time.sleep(1) # 1秒スリープ
        print(dict['count'])
    print('end worker2', dict['count'])

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        # 共有メモリの準備
        dict = manager.dict({'count': 0}) # dict型の共有オブジェクト

        # 関数を実行するプロセスの準備
        p1 = multiprocessing.Process(target=worker1, args=(dict,))
        p2 = multiprocessing.Process(target=worker2, args=(dict,))
 
        # プロセスの開始
        p1.start()
        p2.start()

        # プロセス終了まで待つ
        p1.join()    
        p2.join()    

結果は、次のとおりです。

start worker1
start worker2
0
2
3
3
4
6
7
8
9
9
end worker1 10
end worker2 10

6. 参考


この記事が気に入ったらサポートをしてみませんか?