placによる並列処理
placモジュールについて
plac は、もともとコマンドラインのオプション引数解析のための拡張ライブリです。 plac が提供する機能には、プロセスベース、スレッドベースのインタプリタを使って並列処理を行うことができるというものがあります。
インストール
拡張モジュールなので次のようにインストールします。
code: bash condaの場合
$ conda install plac
code: bash pipの場合
$ pip install plac
オプション解析の例
次のようなデコレートした関数についてコマンドラインオプションや引数の解析処理を行ってくれます。
code: 01_plac_demo.py
from pathlib import Path
import plac
@plac.opt('output_dir', "Optional output directory", type=Path)
@plac.opt('n_iter', "Number of training iterations", type=int)
@plac.flg('debug', "Enable debug mode")
def main(model, output_dir='.', n_iter=100, debug=False):
"""A script for machine learning"""
if __name__ == '__main__':
plac.call(main)
code: bash
$ python 01_plac_demo.py --help
A script for machine learning
positional arguments:
{A,B,C} Model name
optional arguments:
-h, --help show this help message and exit
-o ., --output-dir . Optional output directory
-n 100, --n-iter 100 Number of training iterations
-d, --debug Enable debug mode
引数をリストで渡して関数を連続実行する
Python セミナー実践編の "演習3:正規表現を使って文字列をパースしてみよう" で例示していますが、plac.call() に実行する関数と、その引数をリストとして渡すことで、連続して実行してくれます。これは、プログラムの自己テストなどのときに便利です。 code: python
import re
solutions= [
['EX0', r'^A-Z', 'Reject'], ]
tests = [
]
def exercise(testcase, text, pattern, flag='Accept'):
result={
}
if re.search(pattern, text):
else:
if __name__ == '__main__':
import plac
for id, pattern, flag in solutions:
print(f'--{id}:"{pattern}" -----')
for testcase in tests:
x = plac.call(exercise, cmdargs)
print(f'testcase: {testcase1:2}, result: {x}') del testcase
plac.Interpreter() でスレッドベースやプロセスベースの並列実行
plac はスレッドベースやプロセスベースで関数を実行することができます。
次のプログラムはモンテカルロ法による円周率の近似値を求めるものです。
code: 03_calc_pi.py
from random import random
def calc_pi(N):
inside = 0
for j in range(N):
x, y = random(), random()
if x*x + y*y < 1:
inside += 1
return (4.0 * inside) / N
if __name__ == '__main__':
import sys
import time
start_time = time.time()
pi = calc_pi(10_000_000)
wall_time = time.time() - start_time
print(f'{pi} in %f seconds {wall_time}')
このプログラムを plac.Interpreter()を使って並列実行できるようにしたものです。
与えた試行回数をプラットフォームのCPU数で分割して並列処理させています。
code: python
import math
from random import random
import multiprocessing
import plac
class PiCalculator(object):
@plac.annotations(
npoints=('number of integration points', 'positional', None, int),
mode=('sequential|parallel|threaded', 'option', 'm', str, 'SPT'))
def __init__(self, npoints, mode='S'):
self.npoints = npoints
if mode == 'P':
elif mode == 'T':
elif mode == 'S':
self.n_cpu = multiprocessing.cpu_count()
def submit_tasks(self):
npoints = math.ceil(self.npoints / self.n_cpu)
self.i = plac.Interpreter(self).__enter__()
return [self.i.submit('calc_pi %d' % npoints)
for _ in range(self.n_cpu)]
def close(self):
self.i.close()
@plac.annotations(npoints=('npoints', 'positional', None, int))
def calc_pi(self, npoints):
counts = 0
for j in range(npoints):
n, r = divmod(j, 1_000_000)
if r == 0:
yield '%dM iterations' % n
x, y = random(), random()
if x*x + y*y < 1:
counts += 1
yield (4.0 * counts) / npoints
def run(self):
tasks = self.i.tasks()
for t in tasks:
t.run()
try:
total = 0
for task in tasks:
total += task.result
except: # the task was killed
print(tasks)
return
return total / self.n_cpu
if __name__ == '__main__':
pc = plac.call(PiCalculator)
pc.submit_tasks()
try:
import time
start_time = time.time()
pi = pc.run()
wall_time = time.time() - start_time
print(f'{pi} in {wall_time} seconds')
finally:
pc.close()
code: bash
$ python 04_plac_calc_pi.py --help
usage: plac_pi_parallel.py -h -m S npoints positional arguments:
npoints number of integration points
optional arguments:
-h, --help show this help message and exit
-m S, --mode S sequential|parallel|threaded
code: bash
$ python 04_plac_calc_pi.py -m S 100000000
3.14177604 in 45.14822578430176 seconds
$ python 04_plac_calc_pi.py -m T 100000000
3.14149352 in 40.492534160614014 seconds
$ python 04_plac_calc_pi.py -m P 100000000
3.1415856799999995 in 27.34751796722412 seconds
-mオプションで実行モードを与えます。シーケンシャル(S)、スレッド(T)、プロセス(P)
シミュレーションようなCPU計算処理が主な処理となる問題では、スレッドベースでの並列化による大幅な性能向上は期待できません。しかし、プロセスベースでの並列化では、約1.5倍速くなったことが確認できます。
まとめ
plac を使って関数を簡単に並列実行できることは注目すべきで、覚えていて損はないでしょう。
参考