プロセスベースの並列処理
この資料はmultiprocessing — Manage Processes Like Threads を基に作成されています。
multiprocessing
Pythonの標準ライブラリ multiprocessingモジュールは、多くのヘルパー関数を提供されていて、複数のプロセスで並列処理を行うことができます。
次のような特徴があります。
スレッドの代わりにプロセスを生成してGILを回避
ローカル(SMP)とリモート(DMP)の両方で並列処理OK
threadingモジュールとほとんど同じのAPI
start(), run(), join()
threadingモジュールには該当しないものもある
コンテキストとプロセス開始のためのコスト
multiprocessing ではプロセス開始たのために、spawn、fork、forkserver の3つの方法 が提供されています。
spawn
親プロセスは新たに python インタープリタープロセスを開始します。子プロセスはプロセスオブジェクトの run() メソッドの実行に必要なリソースのみ継承します。特に、親プロセスからの不要なファイル記述子とハンドルは継承されません。この方式を使用したプロセスの開始は fork や forkserver に比べ遅くなります。
Unix と Windows で利用可能。Windows と macOS でのデフォルト。
fork
親プロセスは os.fork() を使用して Python インタープリターをプロセスを開始します。子プロセスはそれが開始されるとき、事実上親プロセスと同一になります。親プロセスのリソースはすべて子プロセスに継承されます。マルチスレッドプロセスのforkは安全性に問題があることに注意してください。
また、forkは親プロセスのメモリ内容をコピーするため、大きなデータを並列処理しようとすると、メモリコピーのコストが無視できなくなります。
Unix でのみ利用可能。Unix でのデフォルト。
forkserver
プログラムを開始するとき forkserver 方式を選択した場合、サーバープロセスが開始されます。それ以降、新しいプロセスが必要になったときはいつでも、親プロセスはサーバーに接続し、新しいプロセスのフォークを要求します。フォークサーバープロセスはシングルスレッドなので os.fork() の使用に関しても安全です。不要なリソースは継承されません。
threadingモジュールとの違い
multiprocessingモジュールのAPIは threadingモジュールによく似ていますが、根本的に異なる方法で機能します。
multiprocessingライブラリは、並列タスクごとに複数のプロセスを生成します。 これは、各プロセスに独自のPythonインタープリター、つまり独自のGILを持っているため、それぞれのプロセスではGILをうまく回避します。 したがって、各プロセスを個別のCPUに割り当てて、すべてのプロセスが終了したら、必要に応じて最後に集計処理をすることができます。
ただし、いくつかの欠点があります。 プロセス生成のコストはスレッド作成よりもコスト高となることと、余分なプロセスを生成すると、プロセッサ間でデータを交換する必要があるため、I/Oオーバーヘッドが発生します。 このため、非常に短いタスクでは全体的な実行時間が長くなる可能性があります。 ただし、データが各プロセスに制限されているような場合では、大幅な高速化を実現でる可能性があります。
並列プログラムでは、"並列処理について"で説明した アムダールの法則 に示されているように、高速化できるかは、並列化できるタスクがどれだけ多いかに依存しています。
Processクラス
プロセスの生成
multiprocessing でのプロセスは、 Processオブジェクトの start() メソッドが呼ばれることによって生成されます。Processクラスは threading.Thread のAPIに従っています。
code: 01_process_simple.py
import multiprocessing as mp
id = 0
def worker():
print(f'Worker')
jobs = []
for i in range(5):
p = mp.Process(target=worker)
jobs.append(p)
p.start()
これを実行すると5つのプロセスが生成されて、文字列"Worker"がそれぞれのプロセスで出力されます。ここで、各プロセスが出力ストリームへのアクセスは競合していることに注意してください。つまり、同時に出力しているわけではありません。
code: bash
$ python 01_process_simple.py
Worker
Worker
Worker
Worker
Worker
引数を与えてプロセスを生成
プロセスを生成するときに、実行する作業への引数を与えることができます。threadingモジュールとは異なり、multiprocessing モジュールではプロセスに引数を渡すために、pickleを使用して引数をシリアル化して与えます。つまりpickle化できるものしか与えることができません。
次の例は、各ワーカーにIDを渡して、それを出力するものです。
code: 02_process_with_args.py
import multiprocessing as mp
def worker(id):
print(f'Worker:{id}')
jobs = []
for id in range(5):
p = mp.Process(target=worker, args=(id,))
jobs.append(p)
p.start()
argsはタプルで渡していることに注目してください。
code: bash
$ python 02_process_withargs.py
Worker:0
Worker:1
Worker:2
Worker:3
Worker:4
インポート可能なターゲット関数
別のスクリプトに記述されているターゲット関数をインポートすることもできます。
code: 03_process_import_target.py
import multiprocessing as mp
from worker import worker
jobs = []
for _ in range(5):
p = mp.Process(target=worker)
jobs.append(p)
p.start()
code: worker.py
def worker():
print('Worker')
return
実行結果は、はじめの01_process_simple.pyと同じになります。
code: bash
$ python 03_process_import_target.py
Worker
Worker
Worker
Worker
Worker
現在のプロセスの知る
引数を与えてプロセスを識別または名前を付けるのは面倒で不要です。 各プロセスインスタンスには、プロセスの作成時に変更できるデフォルト値の名前があります。 プロセスに名前を付けることは、特に複数のタイプのプロセスが同時に実行されているアプリケーションで、それらを追跡するのに役立ちます。
code: 04_process_names.py
import multiprocessing as mp
import time
def worker():
name = mp.current_process().name
print(f'{name} Starting')
time.sleep(2)
print(f'{name} Exiting')
def my_service():
name = mp.current_process().name
print(f'{name} Starting')
time.sleep(2)
print(f'{name} Exiting')
service = mp.Process( name='my_service', target=my_service)
worker_1 = mp.Process( name='worker 1', target=worker)
worker_2 = mp.Process( target=worker)
worker_1.start()
worker_2.start()
service.start()
出力には各行の現在のプロセスの名前が含まれます。 名前に"Process-3"が含まれる行は、名前のないプロセスworker_2に対応しています。
code: bash
$ python 04_process_names.py
worker 1 Starting
Process-3 Starting
my_service Starting
worker 1 Exiting
Process-3 Exiting
my_service Exiting
デーモンプロセス
デフォルトでは、メインプログラムはすべての子プロセスが終了するのを待ってから終了します。メインプログラムの終了をブロックせずに、実行されるバックグラウンドプロセスを開始すると、ワーカーを中断する簡単な方法がないサービスや、作業の途中でプロセスを停止させてもデータが失われたり破損したりしないサービスで便利になります。例えば、サービス監視ツールの場合の死活監視のタスクを生成するプロセスです。
プロセスをデーモンとしてマークするには、ProcessオブジェクトのdaemonアトリビュートにTrueをセットします。 デフォルトでは、プロセスはデーモンではありません。
code: 05_process_daemon.py
import multiprocessing as mp
import time
import sys
def daemon():
p = mp.current_process()
print(f'Starting: {p.name} {p.pid}')
sys.stdout.flush()
time.sleep(2)
print(f'Exiting: {p.name} {p.pid}')
sys.stdout.flush()
def non_daemon():
p = mp.current_process()
print(f'Starting: {p.name} {p.pid}')
sys.stdout.flush()
print(f'Exiting: {p.name} {p.pid}')
sys.stdout.flush()
d = mp.Process( name='daemon', target=daemon)
d.daemon = True
n = mp.Process( name='non-daemon', target=non_daemon)
n.daemon = False
d.start()
time.sleep(1)
n.start()
デーモンプロセスが2秒間のスリープから復帰する前に、すべての非デーモンプロセス(メインプログラムを含む)が終了するため、出力にはデーモンプロセスからの"Exiting"の文字列が含まれません。
code: bash
$ python 05_process_daemon.py
Starting: daemon 62825
Starting: non-daemon 62826
Exiting: non-daemon 62826
デーモンプロセスは、メインプログラムが終了する前に自動的に終了します。これにより、孤立したプロセスが実行されたままになるのを防ぎます。 Linux系プラットフォームでは、psなどのコマンドの出力から実行時に出力されるプロセスID値を探し、そのプロセスを確認することできます。
プロセスの終了を待つ
プロセスが作業を完了して終了するまで待つには、Processオブジェクトのjoin()メソッドを使用します。
code: 06_process_daemon_join.py
import time
import sys
import multiprocessing as mp
def daemon():
name = mp.current_process().name
print(f'Starting: {name}')
time.sleep(2)
print(f'Exiting: {name}')
def non_daemon():
name = mp.current_process().name
print(f'Starting: {name}')
print(f'Exiting: {name}')
d = mp.Process( name='daemon', target=daemon)
d.daemon = True
n = mp.Process( name='non-daemon', target=non_daemon)
n.daemon = False
d.start()
time.sleep(1)
n.start()
d.join()
n.join()
メインプロセスはjoin()メソッドを使用してデーモンプロセスが終了するのを待つため、今回は文字列"Exiting"が出力されます。
code: bash
$ python 06_process_daemon_join.py
Starting: daemon
Starting: non-daemon
Exiting: non-daemon
Exiting: daemon
デフォルトでは、join()は無期限にブロックします。 引数timeout(プロセスが非アクティブになるのを待つ秒数を表すfloat値)を渡すこともできます。 この場合、プロセスがタイムアウト時間内に完了しない場合でも、join()は戻ってきます。
code: 07_process_daemon_join_timeout.py
import sys
import time
import multiprocessing as mp
def daemon():
name = mp.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = mp.current_process().name
print('Starting:', name)
print('Exiting :', name)
d = mp.Process( name='daemon', target=daemon)
d.daemon = True
n = mp.Process( name='non-daemon', target=non_daemon)
n.daemon = False
d.start()
n.start()
d.join(1)
print('d.is_alive()', d.is_alive())
n.join()
渡されたtimeoutはデーモンがスリープしている時間よりも短いため、join()が戻った後もプロセスは動作しています。
code: bash
$ python 07_process_daemon_join_timeout.py
Starting: daemon
Starting: non-daemon
Exiting : non-daemon
d.is_alive() True
プロセスの停止
プロセスに終了するように通知して、受け取ったプロセス側で停止する方法(これはポイズン・ピルテクニックと言われます)が安全でお勧めなのですが、プロセスがハングまたはデッドロックしているように見える場合は、強制的に強制終了できると便利です 。 Processオブジェクトでterminate()メソッドを呼び出すと、子プロセスが強制終了されます。
code: process_terminate.py
import time
import multiprocessing as mp
def slow_worker():
print('Starting worker')
time.sleep(1)
print('Finished worker')
p = mp.Process(target=slow_worker)
print('BEFORE:', p, p.is_alive())
p.start()
print('DURING:', p, p.is_alive())
p.terminate()
print('TERMINATED:', p, p.is_alive())
p.join()
print('JOINED:', p, p.is_alive())
プロセスを管理するコードにオブジェクトのステータスを更新して終了を反映させる時間を与えるために、プロセスを終了した後にjoin()メソッドを呼び出すことが重要です。
code: bash
$ python 08_process_terminate.py
BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
TERMINATED: <Process(Process-1, started)> True
JOINED: <Process(Process-1, stoppedSIGTERM)> False
プロセスの終了コード
プロセスの終了時に生成されるステータスコードには、Processオブジェクトのexitcodeアトリビュートで参照することができます。 exitcodeは、以下の表のようになります。
table: プロセスの終了コード
Exit Code Meaning
== 0 正常終了 エラーは発生しなかった
0 プロセスにエラーがあり、そのコードで終了した
< 0 プロセスは -1 * exitcodeのシグナルで強制終了された
== 1 捕獲されない例外が発生した場合に自動的にセットされる
code: 09_process_exitcode.py
import sys
import time
import multiprocessing as mp
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
jobs = []
funcs = exit_error, exit_ok, return_value, raises, terminated
for f in funcs:
print('Starting process for', f.__name__)
j = mp.Process(target=f, name=f.__name__)
jobs.append(j)
j.start()
jobs-1.terminate()
for j in jobs:
j.join()
print(f'{j.name:>15}.exitcode = {j.exitcode}')
code: bash
$ python 09_process_exitcode.py
Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
exit_error.exitcode = 1
exit_ok.exitcode = 0
return_value.exitcode = 0
Process raises:
Traceback (most recent call last):
File "/home/iisaka/.conda/envs/class_parallel/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/home/iisaka/.conda/envs/class_parallel/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/iisaka/Class.Parallel/Multiprocessing/09_process_exitcode.py", line 15, in raises
raise RuntimeError('There was an error!')
RuntimeError: There was an error!
raises.exitcode = 1
terminated.exitcode = -15
例外を発生させるプロセスの終了コードは、自動的に1がセットされます。
ロギング
並行処理のプログラムデバッグするときは、multiprocessingによって提供されるオブジェクトの内部にアクセスできると便利です。 log_to_stderr()と呼ばれるロギングを有効にする便利なモジュールレベルの関数があります。 loggingを使用してloggerオブジェクトを設定し、ハンドラーを追加して、ログメッセージが標準エラー出力に送信されるようにします。
code: 10_process_log_to_stderr.py
import sys
import logging
import multiprocessing as mp
def worker():
print('Doing some work')
sys.stdout.flush()
mp.log_to_stderr(logging.DEBUG)
p = mp.Process(target=worker)
p.start()
p.join()
デフォルトでは、ログレベルはNOTSETに設定されているため、メッセージは生成されません。DEBUGレベルを渡して、loggerオブジェクトの詳細レベルに初期化します。
実際にはProcessオブジェクト内部で処理されるため、このコードにはloggerオブジェクトは見えていません。
code: bash
$ python 10_process_log_to_stderr.py
INFO/Process-1 child process calling self.run()
Doing some work
INFO/Process-1 process shutting down
DEBUG/Process-1 running all "atexit" finalizers with priority >= 0
DEBUG/Process-1 running the remaining "atexit" finalizers
INFO/Process-1 process exiting with exitcode 0
INFO/MainProcess process shutting down
DEBUG/MainProcess running all "atexit" finalizers with priority >= 0
DEBUG/MainProcess running the remaining "atexit" finalizers
loggerを直接操作する(レベル設定を変更するか、ハンドラーを追加する)には、get_logger()メソッドを使用します。
code: 11_process_get_logger.py
import sys
import logging
import multiprocessing as mp
def worker():
print('Doing some work')
sys.stdout.flush()
mp.log_to_stderr()
logger = mp.get_logger()
logger.setLevel(logging.INFO)
p = mp.Process(target=worker)
p.start()
p.join()
loggerは、ロギング構成ファイルAPIで"multiprocess"という名前で構成することもできます。
code: bash
$ python 11_process_get_logger.py
INFO/Process-1 child process calling self.run()
Doing some work
INFO/Process-1 process shutting down
INFO/Process-1 process exiting with exitcode 0
INFO/MainProcess process shutting down
Processクラスのサブクラス
別のプロセスでジョブを開始する最も簡単な方法は、Processクラスのコンストラクタにターゲット関数を渡すことですが、このときカスタムサブクラスを使用することもできます。
code: 12_process_subclass.py
import multiprocessing as mp
class Worker(mp.Process):
def run(self):
print(f'In {self.name}')
return
jobs = []
for _ in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
Processクラスの派生クラスは、そのタスクを実行するために run()をオーバーライドする必要があります。
code: bash
$ python 12_process_subclass.py
In Worker-1
In Worker-2
In Worker-3
In Worker-4
In Worker-5
プロセスの同期
複数のプロセスを同期するためには次の方法があります。
プロセス間でのメッセージの送受信
プロセス間でシグナルを送受信
Lock/RLockオブジェクトを使用したロックの設定
Conditionオブジェクトの使用した条件設定
Semaphoreオブジェクトの使用したリソースへの同時アクセス制御
Queuesオブジェクトを使用したオブジェクト交換を利用
Pipesオブジェクトを使用したオブジェクト交換を利用
プロセスへのメッセージの受け渡し
threadingと同様に、multiproessingの一般的な使用パターンは、ジョブを複数のワーカーに分割して並行して実行することです。 複数のプロセスを効果的に使用するためには、通常、プロセス間の通信が必要です。これにより、ジョブを分割して実行し、その結果を集約することができます。multiprocessingを使用してプロセス間で通信する簡単な方法は、Queueオブジェクトを使用してメッセージをやり取りすることです。 pickleでシリアル化できるオブジェクトはすべて、Queueを通してプロセスへ受け渡すことができます。
code: 13_process_queue.py
import multiprocessing as mp
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = mp.current_process().name
print(f'Doing something fancy in {proc_name} for {self.name}!')
def worker(q):
obj = q.get()
obj.do_something()
queue = mp.Queue()
p = mp.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
queue.close()
queue.join_thread()
p.join()
このコードは、ひとつメッセージをひとつのワーカーに渡すだけの簡単なもので、メインプロセスはワーカーが終了するのを待ちます。
code: bash
$ python 13_process_queue.py
Doing something fancy in Process-1 for Fancy Dan!
より複雑な例は、JoinableQueueオブジェクトのデータを消費して、結果を親プロセスに返す複数のワーカーを管理する方法です。ポイズン・ピルテクニックは、ワーカーを止めるために使用されます。 実際のタスクを設定した後、メインプログラムはワーカーごとに1つの値stopをジョブキューに追加します。 ワーカーが特別な値を見つけると、処理ループから抜け出します。 メインプロセスは、タスクキューのjoin()メソッドを使用して、すべてのタスクが終了するのを待ってから結果を処理します。
code: 14_process_producer_consumer.py
import time
import multiprocessing as mp
class Consumer(mp.Process):
def __init__(self, task_queue, result_queue):
mp.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print(f'{proc_name}: Exiting')
self.task_queue.task_done()
break
print(f'{proc_name}: {next_task}')
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task:
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(1) # ダミーのタスク
product=self.a * self.b
return f'{self.a} * {self.b} = {product}'
def __str__(self):
return f'{self.a} * {self.b}'
if __name__ == '__main__':
# キューとの通信を確立
tasks = mp.JoinableQueue()
results = mp.Queue()
# コンシューマープロセスを生成
num_consumers = mp.cpu_count() * 2
print(f'Creating {num_consumers} consumers')
consumers = [
Consumer(tasks, results)
for _ in range(num_consumers)
]
for worker in consumers:
worker.start()
# ジョブをキューに入れる
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# ポイズン・ピルを各コンシューマーに追加
for i in range(num_consumers):
tasks.put(None)
# すべてのタスクの終了を待つ
tasks.join()
# 結果を出力
while num_jobs:
result = results.get()
print(f'Result: {result}')
num_jobs -= 1
ジョブは順番にキューに入りますが、並列して実行されるため、ジョブが完了する順番については保証されません。
code: bash
$ python 14_process_producer_consumer.py
Creating 8 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-5: 4 * 4
Consumer-6: 5 * 5
Consumer-7: 6 * 6
Consumer-8: 7 * 7
Consumer-1: 8 * 8
Consumer-4: 9 * 9
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-3: Exiting
Consumer-6: Exiting
Consumer-8: Exiting
Consumer-7: Exiting
Consumer-1: Exiting
Consumer-4: Exiting
Result: 0 * 0 = 0
Result: 3 * 3 = 9
Result: 1 * 1 = 1
Result: 4 * 4 = 16
Result: 2 * 2 = 4
Result: 5 * 5 = 25
Result: 7 * 7 = 49
Result: 6 * 6 = 36
Result: 8 * 8 = 64
Result: 9 * 9 = 81
プロセス間でのシグナル
Eventクラスは、プロセス間で状態情報を伝達する簡単な方法を提供します。 Eventは、設定状態と未設定状態の間で切り替えることができます。 Eventオブジェクトは、オプションのtimeout値を使用して、未設定から設定に変更されるのを待つことができます。
code: 15_process_event.py
import time
import multiprocessing as mp
def wait_for_event(e):
""" 何かをする前に、イベントが設定されるのを待ちます """
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
""" t 秒待ってからタイムアウト """
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
e = mp.Event()
w1 = mp.Process( name='block', target=wait_for_event, args=(e,))
w1.start()
w2 = mp.Process( name='nonblock', target=wait_for_event_timeout, args=(e, 2))
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
wait()がタイムアウトすると、エラーなしで戻ります。 呼び出し元は、is_set()を使用してイベントの状態を確認する必要があります。
code: bash
$ python 15_process_event.py
main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True
リソースへのアクセス制御
単一のリソースを複数のプロセス間で共有する必要がある状況では、Lockを使用してアクセスの競合を回避することができます。
code: 16_process_lock.py
import sys
import multiprocessing as mp
def worker_with(lock, stream):
with lock:
stream.write('with文でロックを取得\n')
def worker_no_with(lock, stream):
lock.acquire()
try:
stream.write('直接ロックを取得\n')
finally:
lock.release()
lock = mp.Lock()
w = mp.Process( target=worker_with, args=(lock, sys.stdout))
nw = mp.Process( target=worker_no_with, args=(lock, sys.stdout))
w.start()
nw.start()
w.join()
nw.join()
この例では、2つのプロセスが出力ストリームへのアクセスをロックして同期しない場合、コンソールに出力されるメッセージが混じり合う可能性があります。
code: bash
$ python 16_process_lock.py
with文でロックを取得
直接ロックを取得
同じプロセスで個別のコードがロックを「再取得」することはLock()ではできません。
code: 17_process_lock_multi.py
import sys
import multiprocessing as mp
def worker(lock, stream):
try:
l = lock.acquire()
stream.write(f'First Lock {l}\n')
l = lock.acquire(0)
stream.write(f'Second Lock {l}\n')
finally:
lock.release()
lock = mp.Lock()
w = mp.Process( target=worker, args=(lock, sys.stdout))
w.start()
w.join()
code: bash
$ python 17_process_lock_multi.py
First Lock True
Second Lock False
同じプロセスからの個別のコードがロックを「再取得」する必要がある場合、Lock()ではなく、RLock()を使用します。
code: 18_process_rlock_multi.py
import sys
import multiprocessing as mp
def worker(lock, stream):
try:
l = lock.acquire()
stream.write(f'First Lock {l}\n')
l = lock.acquire(0)
stream.write(f'Second Lock {l}\n')
finally:
lock.release()
lock = mp.RLock()
w = mp.Process( target=worker, args=(lock, sys.stdout))
w.start()
w.join()
code: bash
$ python 18_process_rlock_multi.py
First Lock True
Second Lock True
Conditionでの同期操作
Conditionオブジェクトを使用してワークフローの一部を同期し、別々のプロセスにある場合でも、一部は並行して実行され、その他は順次実行されるようにすることができます。
code: process_condition.py
import time
import multiprocessing as mp
def stage_1(cond):
""" タスクの最初のstage を実行し、
次に、stage_2に通知して続行する
"""
name = mp.current_process().name
print('Starting', name)
with cond:
print(f'{name} done and ready for stage 2')
cond.notify_all()
def stage_2(cond):
""" stage_1が完了したことを示す条件を待つ """
name = mp.current_process().name
print('Starting', name)
with cond:
cond.wait()
print(f'{name} running...')
condition = mp.Condition()
s1 = mp.Process(name='s1', target=stage_1, args=(condition,))
s2_clients = [
mp.Process( name='stage_2{}'.format(i), target=stage_2, args=(condition,))
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
このコードでは、第1ステージの実行が完了した後で、2つのプロセスがジョブの第2ステージを並行して実行されます。
code: bash
$ python 19_process_condition.py
Starting stage_21
Starting stage_22
Starting s1
s1 done and ready for stage 2
stage_21 running...
stage_22 running...
Semaphoreでリソースへの同時アクセスを制御
全体の数を制限しながら、一度に複数のワーカーがリソースにアクセスできるようにすると便利な場合があります。 たとえば、接続プールが固定数の同時接続をサポートしている場合や、ネットワークアプリケーションが固定数の同時ダウンロードをサポートしている場合があります。 Semaphoreオブジェクトの利用は、これらの接続を管理する1つの方法です。
code: 20_process_semaphore.py
import time
import random
import multiprocessing as mp
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.mgr = mp.Manager()
self.active = self.mgr.list()
self.lock = mp.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
def __str__(self):
with self.lock:
return str(self.active)
def worker(s, pool):
name = mp.current_process().name
with s:
pool.makeActive(name)
print(f'Activating {name} now running {pool}')
time.sleep(random.random())
pool.makeInactive(name)
pool = ActivePool()
s = mp.Semaphore(3)
jobs = [
mp.Process( target=worker, name=str(i), args=(s, pool),)
for i in range(10)
]
for j in jobs:
j.start()
while True:
alive = 0
for j in jobs:
if j.is_alive():
alive += 1
j.join(timeout=0.1)
print(f'Now running {pool}')
if alive == 0:
break
この例では、ActivePoolクラスは、特定の瞬間に実行されているプロセスを追跡するための便利な方法として機能します。 実際のリソースプールは、接続またはその他の値を新しくアクティブなプロセスに割り当て、タスクが完了したときに値を再利用する可能性があります。 ここでは、プールはアクティブなプロセスの名前を保持するために使用され、3つだけが同時に実行されていることを示しています。
code: bash
$ python 20_process_shemaphore.py
Activating 0 now running '0', '1', '2'
Activating 1 now running '0', '1', '2'
Activating 2 now running '0', '1', '2'
Now running '0', '1', '2'
Activating 3 now running '1', '2', '3'
Activating 4 now running '1', '3', '4'
Now running '1', '3', '4'
Now running '1', '3', '4'
Now running '1', '3', '4'
Now running '1', '3', '4'
Activating 5 now running '1', '3', '5'
Now running '1', '3', '5'
Activating 6 now running '1', '5', '6'
Now running '1', '5', '6'
Now running '1', '5', '6'
Activating 7 now running '1', '6', '7'
Activating 8 now running '6', '7', '8'
Now running '6', '7', '8'
Now running '6', '7', '8'
Now running '6', '7', '8'
Now running '6', '7', '8'
Now running '6', '7', '8'
Activating 9 now running '6', '8', '9'
Now running '6', '8', '9'
Now running '8', '9'
Now running '8'
Now running '8'
Now running []
Queuesクラスを使用したオブジェクト交換
Queueクラスは Queue.Queue のほぼクローンです。
Queueus は プロセスセーフです。
補足説明:プロセスセーフ
マルチプロセスプログラミングにおける概念。
あるコードがプロセスセーフであるという場合、 そのコードを複数のプロセスで
同時並行的に実行しても問題が発生しないことを表します。
code: 21_queue.py
import multiprocessing as mp
def func(q):
q.put(42, None, 'hello')
q = mp.Queue()
p = mp.Process(target=func, args=(q,))
p.start()
print(q.get())
p.join()
Pipe()メソッドを使用したオブジェクト交換
Pipe()メソッドは、デフォルトでは双方向のパイプによって接続された コネクションオブジェクトのペアを返します。
この2つのコネクションオブジェクトはパイプの両端となります。
send() と recv() のメソッドでやり取りを行います。
2つのプロセスが同時に同じプロセス側に対して、 読みこもうとしたり書きだそうすればパイプの中のデータは壊れてしまう。
異なるプロセスに対して同時に読み書きするのであれば問題はありません。
code: 22_pipe.py
import multiprocessing as mp
def funcconn):
conn.send(42, None, 'hello')
conn.close()
parent_conn, child_conn = mp.Pipe()
p = mp.Process(target=func, args=(child_conn,))
p.start()
print(parent_conn.recv())
プロセス間で状態を共有
マルチプロセスで並列処理をしているときは、 データや状態を共有することは重要になります。
共有メモリを使ってデータを共有する
サーバプロセスでManagerオブジェクトによる共有状態の管理
共有メモリ
データは Value() や Array() を使って共有メモリに格納することができます。
ただし、単一のコンピュータでしか使えません。
code: 23_shared_memory.py
import multiprocessing as mp
def func(n, a):
n.value = 3.1415927
for i in range(len(a)):
ai = -ai
num = mp.Value('d', 0.0) # 倍精度浮動小数点を表現 (double)
arr = mp.Array('i', range(10)) # 符号付き整数を表現 (int)
p = mp.Process(target=func, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr:)
Managerオブジェクトによる共有状態の管理
前の例では、アクティブなプロセスのリストは、マネージャーによって作成された特別なタイプのリストオブジェクトを介してActivePoolインスタンスで一元的に維持されます。 Managerは、サーバオブジェクトを介して他プロセスを操作できるようにします。Managerオブジェクトは 、すべてのユーザー間で共有情報の状態を調整する必要があります。
Managerオブジェクトでサポートしている型
list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value および Array
共有メモリを使うようりも柔軟に処理できます。
ネットワーク越しに別のコンピュータのプロセスとやり取りできます。
反面、共有メモリよりも遅くなります。
code: 24_process_manager_dict.py
import pprint
import multiprocessing as mp
def worker(d, key, value):
dkey = value
mgr = mp.Manager()
d = mgr.dict()
jobs = [
mp.Process(target=worker, args=(d, i, i * 2))
for i in range(10)
]
for j in jobs:
j.start()
for j in jobs:
j.join()
print('Results:', d)
Managerを介してリストを作成することにより、リストが共有され、すべてのプロセスで更新が表示されます。 辞書もサポートされています。
code: bash
$ python 24_process_manager_dict.py
Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 8: 16, 7: 14, 9: 18}
共有名前空間
辞書とリストに加えて、Managerは共有名前空間を作成できます。
code: 25_process_namespaces.py
import multiprocessing as mp
def producer(ns, event):
ns.value = 'This is the value'
event.set()
def consumer(ns, event):
try:
print('Before event:', ns.value)
except Exception as err:
print(f'Before event, error: {err}')
event.wait()
print('After event:', ns.value)
mgr = mp.Manager()
namespace = mgr.Namespace()
event = mp.Event()
p = mp.Process( target=producer, args=(namespace, event))
c = mp.Process( target=consumer, args=(namespace, event))
c.start()
p.start()
c.join()
p.join()
Namespaceオブジェクトに追加された名前付きの値は、Namespaceオブジェクトを受け取るすべてのクライアントに表示されます。
code: bash
$ python 25_process_namespaces.py
Before event, error: 'Namespace' object has no attribute 'value'
After event: This is the value
名前空間内の値の内容が更新されたことは自動的には伝播されません。
code: 26_process_mutable.py
import multiprocessing as mp
def producer(ns, event):
# グローバル値は更新しない!
ns.my_list.append('This is the value')
event.set()
def consumer(ns, event):
print('Before event:', ns.my_list)
event.wait()
print('After event :', ns.my_list)
mgr = mp.Manager()
namespace = mgr.Namespace()
namespace.my_list = []
event = mp.Event()
p = mp.Process( target=producer, args=(namespace, event))
c = mp.Process( target=consumer, args=(namespace, event))
c.start()
p.start()
c.join()
p.join()
リストを更新するには、リストを名前空間オブジェクトに再度アタッチします。
code: bash
$ python 26_process_mutable.py
Before event: []
After event : []
Poolクラスでプロセスをプールする
multiprocessing モジュールでは、threading モジュールには該当するものがない API もいくつか導入されています。 Poolクラスはそのひとつで、「ワーカー・プロセスのプール」を表しています。
Poolオブジェは複数の入力データに対して、サブプロセス群に入力データを分配 (データ並列) して関数を並列実行するのに便利な手段を提供します。
Poolクラスを使用すると、実行する作業を分割してワーカー間で個別に分散できる単純なケースで、固定数のワーカーを管理できます。 ジョブからの戻り値が収集され、リストとして返されます。 プール引数には、プロセスの数と、タスクプロセスの開始時に実行する関数(子プロセスごとに1回呼び出される)が含まれます。
code: 27_process_pool.py
import multiprocessing as mp
def do_calculation(data):
return data * 2
def start_process():
print('Starting', mp.current_process().name)
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = mp.cpu_count() * 2
pool = mp.Pool( processes=pool_size, initializer=start_process)
pool_outputs = pool.map(do_calculation, inputs)
pool.close()
pool.join()
print('Pool :', pool_outputs)
map()メソッドの結果は、個々のタスクが並行して実行されることを除いて、組み込み関数のmap()と機能的に同じです。 プールは入力を並行して処理しているため、close()とjoin()メソッドを使用して、メインプロセスをタスクプロセスと同期し、適切なクリーンアップを行うことができます。
code: bash
$ python 27_process_pool.py
Input : 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
Built-in: <map object at 0x7ffde19ef8d0>
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Pool : 0, 2, 4, 6, 8, 10, 12, 14, 16, 18
デフォルトでは、Poolは固定数のワーカープロセスを作成し、ジョブがなくなるまでジョブをそれらに渡します。Poolクラスのコンストラクタに maxtasksperchild引数を与えると、いくつかのタスクが完了した後にワーカープロセスを再起動するように指示され、長時間実行されるワーカーがこれまで以上にシステムリソースを消費するのを防ぐことができます。
code: 28_process_maxtasksperchild.py
import multiprocessing as mp
def do_calculation(data):
return data * 2
def start_process():
print('Starting', mp.current_process().name)
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = mp.cpu_count() * 2
pool = mp.Pool( processes=pool_size, initializer=start_process, maxtasksperchild=2)
pool_outputs = pool.map(do_calculation, inputs)
pool.close()
pool.join()
print('Pool :', pool_outputs)
Poolオブジェクトは、割り当てられたタスクが完了すると、作業がなくなった場合でもワーカーを再起動します。 この出力は、10個のタスクがあるときのに、8個のワーカーが作成され、各ワーカーは一度に2個のタスクを完了することができます。
code: bash
$ python 28_process_maxtasksperchild.py
Input : 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
Built-in: <map object at 0x7fc02a32a750>
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Pool : 0, 2, 4, 6, 8, 10, 12, 14, 16, 18
タスクをプールしたプロセスにアサインする
Poolクラスには、タスクがワーカープロセスにアサインする複数のメソッドがあります。
code: 29_process_pool_asign.py
import multiprocessing as mp
import time
import os
def func(x):
return x*x
pool = mp.Pool(processes=4) # 4つのワーカプロセスを起動
# "0, 1, 4,..., 81" を出力
print(pool.map(func, range(10)))
# 同じ数値を任意の順で出力
for i in pool.imap_unordered(func, range(10)):
print(i)
# 非同期に "func(20)" を評価
res = pool.apply_async(func, (20,)) # 1プロセスだけで実行
print(res.get(timeout=1)) # "400" が出力
# 非同期に "os.getpid()" を評価
res = pool.apply_async(os.getpid, ()) # 1プロセスだけで実行
print(res.get(timeout=1)) # プロセスのPIDが出力
# 複数プロセスを使って非同期で評価
multiple_results = pool.apply_async(os.getpid, ()) for i in range(4)
print(res.get(timeout=1) for res in multiple_results)
# 5sec sleep
res = pool.apply_async(time.sleep, (5,))
try:
print(res.get(timeout=15))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
map()メソッド
pool.map() は与えた関数とイテラブルオブジェクトをマップして呼び出すものです。
これは、Pythonの組み込みのmap 関数に似ています。
例えば次のコードは与えた引数の値を2乗するシリアル版のコードです。
code: square.py
def square(x):
return x * x
このsquare()関数を使用して、整数のリストの二乗を計算するとします。 シリアルプログラミングでは、次のコードを使用して、リスト内包表記を介して結果を計算および出力できます。
code: 30_square_serial.py
from square import square
result = square(x) for x in range(20)
print(result)
これを、pool.map()で並列版にしたものが次のコードです。
multiprocessing.cpu_count() はシステムのコア数を返します。
code: 31_square_pool_map.py
import multiprocessing as mp
from square import square
pool = mp.Pool(processes=mp.cpu_count())
result = pool.map(square, range(20))
print(result)
このコードはシリアル版のコードとまったく同じ結果を出力しますが、実際には計算がワーカープロセスで並列に分散され、実行されます。 pool.map()メソッドは、出力の順序が正しいことを保証します。
starmap()メソッド
Poolクラスの map()メソッドは、引数を1つだけ受け入れる関数にだけしか適用できません。
複数の引数を受け入れる関数の場合、Poolクラスはstarmap()メソッドも提供します。
これを使用すると、次のpower_n()のような、任意の値の累乗を計算する、より一般的な処理を定義できます。
code: power.py
def power_n(x, n):
return x ** n
そして、このpower_n()関数と入力引数のリストをstarmap()メソッドに渡します。
code: 32_power_pool_starmap.py
import multiprocessing as mp
from power import power_n
pool = mp.Pool(processes=mp.cpu_count())
result = pool.starmap(power_n, (x, 2) for x in range(20))
print(result)
Poolクラスのmap()とstarmap()はどちらも同期メソッドであることに注意してください。 つまり、ワーカープロセスがサブタスクを早く終了した場合、他のワーカープロセスが終了するのを待ちます。 ワークロードがワーカープロセス間で十分にバランスされていない場合、これはパフォーマンスの低下につながる可能性があります。
apply_async()メソッド
Poolクラスは、ワーカープロセスの非同期実行を可能にするapply_async()メソッドも提供します。 入力のリストに対して計算ルーチンを実行するmap()メソッドとは異なり、apply_async()メソッドはルーチンを1回だけ実行します。 したがって、前述のコードを非同期実行するためには、数値のリストを与えて、その値を累乗計算するように定義しなおす必要があります。
code: python
def power_list_n(x_list, n):
return x ** n for x in x_list
apply_async()メソッドを使用するには、入力リストの範囲全体から部分リストに分割し、それらをワーカープロセスに配布する必要もあります。 部分リストは、slice_data()で作成できます。
code: python
def slice_data(data, nprocs):
aver, res = divmod(len(data), nprocs)
nums = []
for proc in range(nprocs):
if proc < res:
nums.append(aver + 1)
else:
nums.append(aver)
count = 0
slices = []
for proc in range(nprocs):
slices.append(data[count: count+numsproc])
count += numsproc
return slices
これで、power_list_n()関数と入力の部分リストをapply_async()メソッドに渡すことができます。
code: 33_power_apply_async.py
import multiprocessing as mp
def power_list_n(x_list, n):
return x ** n for x in x_list
def slice_data(data, nprocs):
aver, res = divmod(len(data), nprocs)
nums = []
for proc in range(nprocs):
if proc < res:
nums.append(aver + 1)
else:
nums.append(aver)
count = 0
slices = []
for proc in range(nprocs):
slices.append(data[count: count+numsproc])
count += numsproc
return slices
nprocs = mp.cpu_count()
pool = mp.Pool(processes=nprocs)
sub_lists = slice_data(range(20), nprocs)
multi_result = pool.apply_async(power_list_n, (d, 2)) for d in sub_lists
result = x for p in multi_result for x in p.get()
print(result)
実際の結果は、get()メソッドとネストされたリスト内包表記を使用して取得できます。
apply_async()メソッドは、出力の順序について何も保証しないことに注意してください。 この例では、apply_async()がリスト内包表記とともに使用されたため、結果が順序付けられていたわけです。
サンプル1
次のコードは、”スレッドベースの並列処理" で例示したリストに乱数を順番に追加するものです。
code: 50_demo_single.pyy
import random
def list_append(count, id, out_list):
for i in range(count):
out_list.append(random.random())
if __name__ == "__main__":
size = 100_000_000
out_list = list()
list_append(size, 0, out_list)
これをスレッドを使った並列処理は次のコードです。
code: 51_demo_thread.py
import threading
import random
def list_append(count, id, out_list):
for i in range(count):
out_list.append(random.random())
if __name__ == "__main__":
import threading
size = 100_000_000 # 追加する乱数の数
threads = 2 # 生成するスレッド数
# ジョブのリストを作成してから、
# 各スレッドをジョブリストに追加するスレッド数繰り返す
jobs = []
proc_size = int( size / threads )
for i in range(0, threads):
out_list = list()
thread = threading.Thread(target=list_append(proc_size, i, out_list))
jobs.append(thread)
# スレッドを開始
for j in jobs:
j.start()
# すべてのスレッドの終了を待つ
for j in jobs:
j.join()
これを multiprocessingモジュールで実装してみましょう。
code: 52_demo_multiprocs.py
import multiprocessing as mp
import random
def list_append(count, id, out_list):
for i in range(count):
out_list.append(random.random())
if __name__ == "__main__":
import threading
size = 100_000_000 # 追加する乱数の数
threads = 2 # 生成するスレッド数
# ジョブのリストを作成してから、
# 各スレッドをジョブリストに追加するスレッド数繰り返す
jobs = []
proc_size = int( size / threads )
for i in range(0, threads):
out_list = list()
job = mp.Process(target=list_append(proc_size, i, out_list))
jobs.append(job)
# スレッドを開始
for j in jobs:
j.start()
# すべてのスレッドの終了を待つ
for j in jobs:
j.join()
threadingで実装したプログラムをmultiprocessingで実装するために必要な変更箇所は、importの行とmultiprocessing.Process行のだけです。
code: threading
import threading
# ...
job = threading.Thread(target=list_append(proc_size, i, out_list))
code: multiprocessing
import multiprocessing as mp
# ...
job = mp.Process(target=list_append(proc_size, i, out_list))
ターゲット関数への引数は個別に渡されます。 それ以外は、コードはThreading実装とほぼ同じです。
code: ipython
In 3: %time %run 50_demo_single.py
CPU times: user 21.7 s, sys: 1.86 s, total: 23.6 s
Wall time: 24 s
In 4: %time %run 51_demo_thread.py
CPU times: user 21.3 s, sys: 2.1 s, total: 23.4 s
Wall time: 23.9 s
In 5: %time %run 52_demo_multiprocs.py
CPU times: user 6.43 ms, sys: 79.7 ms, total: 86.1 ms
Wall time: 16.5 s
この例の場合では単純に、リストを乱数を追加しただけで他に何も処理していませんが、
マルチプロセス処理を、より複雑なプログラムに適用する場合、少し注意する必要があります。例えば、ある配列のデータを並列処理する場合、実際の処理の前後、あるいは処理中に、データ転送が発生する可能性があります。こうした場合では、単純にプロセス数を増やせば速くなるということにはなりません。
code: 53_data_dependency.py
import multiprocessing as mp
def f(d, l):
d1 = '1'
d'2' = 2
d0.25 = None
l.reverse()
if __name__ == '__main__':
manager = mp.Manager()
d = manager.dict()
l = manager.list(range(10))
p = mp.Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
サンプル2: I/Oバウンドの例
"スレッドベースの並列処理"で例示した株価をダウンロードする60_download_stock_sirial.py
をマルチプロセスで処理するようにしてみましょう。
code: 60_download_stock_sirial.py
from stock_downloader import stock_download
from custom_logging import logging
logging.getLogger().setLevel(logging.INFO)
stock_symbols = 'AAPL', 'NVDA', 'MSFT', 'JNJ'
for symbol in stock_symbols:
stock_download(symbol)
threading での実装は次のコードのようになります。
code: 61_download_stok_thread.py
from stock_downloader import stock_download
from queue import Queue
from threading import Thread
from custom_logging import logging
logging.getLogger().setLevel(logging.INFO)
stock_symbols = 'AAPL', 'NVDA', 'MSFT', 'JNJ'
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
symbol = self.queue.get()
try:
stock_download(symbol)
finally:
self.queue.task_done()
# ワーカースレッドと通信するためのキューを作成
queue = Queue()
# ワーカースレッドを作成 4thread
for x in range(4):
worker = DownloadWorker(queue)
# Setting daemon to True will let the main thread exit even though the workers are blocking
# デーモンモードではワーカーがブロックしている場合でも
# メインスレッドが終了する
worker.daemon = True
worker.start()
# ダウンロードする株価コードをキューに入れる
for symbol in stock_symbols:
queue.put(symbol)
# キューがすべてのタスクの処理を終了するのをメインスレッドが待機
queue.join()
multiprocessing の場合は、Poolクラスのおかげで、ずっとスッキリ記述することができます。
code: 62_download_stock_multiprocs.py
from multiprocessing.pool import Pool
from custom_logging import logging
from stock_downloader import stock_download
logging.getLogger().setLevel(logging.INFO)
stock_symbols = 'AAPL', 'NVDA', 'MSFT', 'JNJ'
def main():
with Pool(4) as p:
p.map(stock_download, stock_symbols)
if __name__ == '__main__':
main()
code: python
In 3: %time %run 62_download_stock_multiprocs.py
CPU times: user 16 ms, sys: 18.8 ms, total: 34.7 ms
Wall time: 1.89 s
ただし、このコードは複数のCPUをもつ1つのホストで有効です。
より規模を多くしたいような場合ではマルチノードで動作させたくなります。
この場合は、プロセス間通信での通信速度とレイテンシーが重要になります。
サンプル3: CPUバウンドの例
円周率($ \pi)の近似値を計算してみましょう。
まず、$ \pi を求める漸化式は次のものです。
$ \pi = \int ^{1}_{0}\dfrac{4}{1+x^{2}}dx
補足説明:Gregory–Leibniz の方法
J. Gregory (1638 - 1675, 英),G.W.F. Leibniz (1646 - 1716, 独)
シリアル・バージョン
上記の式を使用すると、多数の点にわたる数値積分を介して$ \pi の値を計算できます。 たとえば、1,000万ポイントを使用することを選択できます。 シリアルコードを以下に示します。
code:70_pi_sirial.py
def calc_pi(nsteps):
pi = 0.0
dx = 1.0 / nsteps
for i in range(nsteps+1):
x = (i + 0.5) * dx
pi += 4.0 / (1.0 + x * x)
return pi * dx
nsteps = 10000000
pi = calc_pi(nsteps)
print(pi)
マルチプロセス・バージョン
このシリアル・バージョンのコードを並列化するためには、forループをサブタスクに分割し、それらをワーカープロセスに配布する必要があります。 つまり、1,000万ポイントをタスクへ均等に分散する必要があります。 これは、組み込みのrange()関数で簡単に処理できます。 range()関数の最初の整数はシーケンスの開始位置で、プロセスのインデックスまたはランクとして設定する必要があります。 2番目の整数は、積分点の数、つまりシーケンスの終わりです。 3番目の整数は、シーケンス内の隣接する要素間のステップであり、二重カウントを回避するためにプロセスの数として設定されます。
次のcalc_partial_pi()関数は、ワーカープロセスのサブタスクにrange()を使用します。
code: python
def calc_partial_pi(rank, nprocs, nsteps, dx):
partial_pi = 0.0
for i in range(rank, nsteps, nprocs):
x = (i + 0.5) * dx
partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx
return partial_pi
サブタスクの入力引数を準備し、Poolクラスのstarmap()メソッドに、
calc_partial_pi()関数と入力引数を与えると$ \piの近似値を計算できます。
code: 71_pi_multiprocs.py
import multiprocessing as mp
def calc_partial_pi(rank, nprocs, nsteps, dx):
partial_pi = 0.0
for i in range(rank, nsteps, nprocs):
x = (i + 0.5) * dx
partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx
return partial_pi
nsteps = 10000000
dx = 1.0 / nsteps
nprocs = mp.cpu_count()
inputs = (rank, nprocs, nsteps, dx) for rank in range(nprocs)
pool = mp.Pool(processes=nprocs)
result = pool.starmap(calc_partial_pi, inputs)
pi = sum(result)
print(pi)
非同期並列計算は、Poolクラスのapply_async()メソッドを使用して実行できます。 starmap()とapply_async()の両方が複数の引数をサポートしているため、calc_partial_pi()関数と入力リストを利用できます。 2つのメソッドの違いは、starmap()はすべてのプロセスからの結果を返すのに対し、apply_async()は単一のプロセスからの結果を返すことです。
次のコードは、apply_async()メソッドを使用したバージョンです。
code: 72_pi_multiprocs_async.py
import multiprocessing as mp
def calc_partial_pi(rank, nprocs, nsteps, dx):
partial_pi = 0.0
for i in range(rank, nsteps, nprocs):
x = (i + 0.5) * dx
partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx
return partial_pi
nsteps = 10000000
dx = 1.0 / nsteps
nprocs = mp.cpu_count()
inputs = (rank, nprocs, nsteps, dx) for rank in range(nprocs)
pool = mp.Pool(processes=nprocs)
multi_result = pool.apply_async(calc_partial_pi, inp) for inp in inputs
result = p.get() for p in multi_result
pi = sum(result)
print(pi)
code: ipython
In 1: %time %run 70_pi_sirial.py
3.141592853589721
CPU times: user 2.34 s, sys: 4.61 ms, total: 2.34 s
Wall time: 2.34 s
In 2: %time %run 71_pi_multiprocs.py
3.141592653589731
CPU times: user 10.5 ms, sys: 11.5 ms, total: 22.1 ms
Wall time: 899 ms
In 3: %time %run 72_pi_multiprocs_async.py
3.141592653589731
CPU times: user 5.8 ms, sys: 11 ms, total: 16.8 ms
Wall time: 897 ms
計算結果について
70_pi_sirial.py と 71_pi_multiprocs.py で求まる近似値は厳密には同じではありません。
小数点以下12桁目までは同じ結果ですが、13桁目以降で違ってしまいます。
これはなぜでしょうか?
理由は「丸め誤差」という浮動小数点特有の悩ましい問題があるためです。
浮動小数点数つまり有限桁数の2進数によって計算を行うと,どうしても誤差が生じてしまいます。これを丸め誤差(Round-Off Error/Rounding Error) と言います。
これには、値が近い2つの数を減算することで発生する「桁落ち」と、絶対値が大きい2つの数を加算することで発生する「情報落ち」の2つの原因があります。
例えば、浮動小数点を持つ配列を総和するとき、インデックスの昇順/降順では、計算結果が違うことがあります。
サンプル4: MapReduceの実装
Poolクラスを使用して、単純な単一サーバーのMapReduce実装を作成できます。 分散処理のすべての利点を提供するわけではありませんが、いくつかの問題を分散可能なタスク単位に分解することがいかに簡単であるかを示しています。
MapReduceベースのシステムでは、入力データはさまざまなWorkerインスタンスで処理するためにチャンクに分割されます。 入力データの各チャンクは、単純な変換を使用して中間状態にマップされます。 次に、中間データが一緒に収集され、キー値に基づいてパーティション化されるため、関連するすべての値が一緒になります。 最後に、パーティション化されたデータは結果セットに縮小されます。
code:simple_mapreduce.py
import multiprocessing as mp
class SimpleMapReduce:
def __init__(self, map_func, reduce_func, num_workers=None):
"""
map_func
入力を中間データにマップする関数
引数として1つの入力値を取り、タプルを返す
引数1つの入力値を持ち、タプルを返します。
キーと縮約(Reduce)する値。
reduce_func
パーティション化された中間データを最終出力に縮約する関数
map_func によって生成されたキーと、
そのキーに関連付けられた値のシーケンスを引数として受け取る
num_workers
プールで作成するワーカーの数
デフォルトは、現在のホストで使用可能なCPUの数
"""
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = mp.Pool(num_workers)
def partition(self, mapped_values):
""" マップされた値をキーで整理する
キーと値のシーケンスを持つタプルの
ソートされていないシーケンスを返す
"""
partitioned_data = collections.defaultdict(list)
for key, value in mapped_values:
partitioned_datakey.append(value)
return partitioned_data.items()
def __call__(self, inputs, chunksize=1):
"""マップを介して入力を処理し、指定された関数を縮約する
inputs
処理される入力データを含む反復可能オブジェクト
chunksize=1
各ワーカーに渡す入力データの部分
マッピングフェーズ中にパフォーマンスを調整するために使用する
"""
map_responses = self.pool.map(
self.map_func,
inputs,
chunksize=chunksize,
)
partitioned_data = self.partition(
itertools.chain(*map_responses)
)
reduced_values = self.pool.map(
self.reduce_func,
partitioned_data,
)
return reduced_values
次のサンプルスクリプトは、SimpleMapReduceを使用して、カレントディレクトリにあるのPytonスクリプトの単語をカウントします。Pythonの予約後の一部は無視します。
code: 80_wordcount.py
import string
import multiprocessing as mp
from simple_mapreduce import SimpleMapReduce
def file_to_words(filename):
"""ファイルを読み取り (word, occurences) のリストを返す
"""
STOP_WORDS = set([
'import', 'if', 'for', 'print', 'try', 'except',
'def', 'self',
])
TR = str.maketrans({
p: ' '
for p in string.punctuation
})
print(f'{mp.current_process().name} reading {filename}')
output = []
with open(filename, 'rt') as f:
for line in f:
# Skip comment lines.
if line.lstrip().startswith('..'):
continue
line = line.translate(TR) # 句読点を削除する
for word in line.split():
word = word.lower()
if word.isalpha() and word not in STOP_WORDS:
output.append((word, 1))
return output
def count_words(item):
"""単語の分割データを単語と出現回数を含むタプルに変換
"""
word, occurences = item
return (word, sum(occurences))
if __name__ == '__main__':
import operator
import glob
input_files = glob.glob('*.py')
mapper = SimpleMapReduce(file_to_words, count_words)
word_counts = mapper(input_files)
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()
print('\nTOP 20 WORDS BY FREQUENCY\n')
top20 = word_counts:20
longest = max(len(word) for word, count in top20)
for word, count in top20:
print('{word:<{len}}: {count:5}'.format(
len=longest + 1,
word=word,
count=count)
)
file_to_words()関数は、各入力ファイルを、単語と数字の1(1回の出現を表す)を含むタプルのシーケンスに変換します。 データは、単語をキーとして使用してpartition()によって分割されるため、結果の構造は、キーと、単語の各出現を表す1つの値のシーケンスで構成されます。 分割されたデータは、削減フェーズ中にcount_words()によって、単語とその単語のカウントを含むタプルのセットに変換されます。
code: bash
$ python3 -u 80_wordcount.py
ForkPoolWorker-2 reading process_condition.py
ForkPoolWorker-1 reading worker.py
ForkPoolWorker-4 reading process_terminate.py
(中略)
ForkPoolWorker-2 reading process_mapreduce.py
ForkPoolWorker-4 reading process_import_target.py
TOP 20 WORDS BY FREQUENCY
process : 83
running : 45
multiprocessing : 44
worker : 40
starting : 37
now : 35
after : 34
processes : 31
start : 29
header : 27
pymotw : 27
caption : 27
end : 27
daemon : 22
can : 22
exiting : 21
forkpoolworker : 21
consumer : 20
main : 18
event : 16
python に与えている -uオプションは、stdout および stderr ストリームをブロックバッファリングをさせないためのものです。このオプションはstdinには影響しません。このオプションは環境変数 PYTHONUNBUFFERED=1 としても同じ挙動になります。1に限らず、空でない文字列を設定すればOKです。
マルチノード
multiprocessingモジュールを使ってマルチノードで計算を行うことはできます。しかし、別のノードでのプロセスの起動と、それらとのプロセス間通信についてプログラマが管理する必要があるため現実的ではありません。
マルチノードを使って並列処理を行いたい場合は、この後で説明する mpi4py などの別のモジュールを利用するか、"Pythonセミナー メッセージブローカー編"で説明しているメッセージブローカーを検討してみてください。
multiprocessing モジュールの制約
multiprocessingモジュールは、IPythonで使用する場合、次のような大きな制約があります。
注釈 このパッケージに含まれる機能を使用するためには、子プロセスから __main__ モジュールをインポートできる必要があります。このことについては プログラミングガイドライン で触れていますが、ここであらためて強調しておきます。なぜかというと、いくつかのサンプルコード、例えば multiprocessing.pool.Pool のサンプルはインタラクティブシェル上では動作しないからです。Python 公式ドキュメント multiprocessing
幸いなことに、multiprocess という multiprocessing モジュールのフォークがあり、これはシリアライズに pickle の代わりに dill を使い、この問題を都合よく克服しています。
multiprocess をインストールし、import で multiprocessing を multiprocess に置き換えることでこの問題を回避できます。
これも良い具合に、multiprocessing は as mp とエイリアスでインポートされることが多く、簡単に対処することができます。
code: python
import multiprocess as mp
def f(x):
return x*x
with mp.Pool(5) as pool:
print(pool.map(f, 1, 2, 3, 4, 5))
まとめ
タスクがIOバウンドの場合、Pythonのマルチプロセッシングとマルチスレッドの両方がうまく機能します。
multiprocessing は、threading よりも簡単に実装することができます。これは、threadingモジュールではThreadクラスをサブクラス化し、スレッドが作業を監視するためのキューを作成する必要があるからです。multiprocessingでは、オリジナルのプロセスは、GILをバイパスして複数の子プロセスにフォークされます。 各子プロセスには、プログラム全体のメモリのコピーがあります。そのため、multiprocessingでは、メモリのオーバーヘッドが高くなることに注意が必要です。 コードがCPUにバインドされている場合、特にターゲットマシンに複数のコアまたはCPUがある場合は、multiprocessing の方が適している可能性があります。
実際のところ、新規開発のプロジェクトでは multiprocessing モジュールを使うことはあまりないかもしれません。理由は、次のようなものがあります。
Python 3.2 で導入された concurrent.futures モジュールの方が便利で、簡単に記述できる
プロセス間通信ではデータをPickle化して送るためパフォーマンスで問題になりやすい
マルチノードでの並列実行が簡単ではない
conncurrent.futures については、"concurrent.futures を使ってみよう" を参照してください
参考
Python 公式ドキュメント - multiprocessing --- プロセスベースの並列処理