SCOOPを使ってみよう
https://gyazo.com/d1b9ccbb3e36d01f3da66679c523cee9
Python で分散処理を行うためにはいくつかの方法がありますが、ここでは、SCOOPというライブラリモジュールを使った方法について説明します。
SCOOP について
SCOOP(Scalabe COncurrent Operations in Python) はPythonで分散処理を行うためのライブラリモジュールです。 ssh と python があれば、ヘテロなコンピュータ(同じ構成のコンピュータシステムでなくても構わないということ)で実行が可能です。
制約条件としては次のものがあります。
worker が実行されるホストには SSH の鍵認証でアクセスできるようになっていること
Pythonやライブラリモジュールのバージョンやパスが各ノードで同じになっていること
入力ファイルがある場合も各ノードで同じパスに存在していること
分散処理させるオブジェクトはPickle化が可能なこと
SCOOP のインストール
SCOOPはpipでインストールすることができます。
code: bashh
$ pip install scoop
クラスタシステム
HPC領域で利用されるクラスタシステムでは次のような構成になっていることが多いです。
https://gyazo.com/1505b3c79b274259d28b1c0f217a57bf
ヘッドノードにログインしてジョブ投入を行い、実際の計算はそれぞれの計算ノードで実行されるような形態となります。
通常であれば、ヘッドノードにホームディレクトリがあり、計算ノードでは同じファイルパスとなるように構成されています。ホームディレクトリはヘッドノードではなく別に用意されているファイルサーバに配置されている構成もあります。
また、ヘッドノードから各計算ノードにはパスワード認証なしでログインできるようになっています。
このようなシステム構成で、SCOOPがpip install --user scoop のようにホームディレクトリ以下にインストールされていれば、SCOOPは何も問題なく簡単に利用することができます。
ヘテロな構成での注意
クラウドなどでユーザ名が異なるようなシステム構成の場合は、そのままではうまく計算させることができません。最低限、同じユーザ名でパスワード認証なしでログインでき、python のインストール環境が同じである必要があります。
この場合、Anaconda Python を使って同じ Conda環境を構築してから、 python と SCOOP をインストールするなど、ひと手間が必要になります。
SCOOP の起動オプション
SCOOP の起動は次は次のように行います。
code: bash
$ python -m scoop --hostfile hosts -vv -n 6 myprog.py arguments -m scoop: SCOOPモジュールの指定
--hostfile: ホストファイル名をオプション引数で与える
-vv: 冗長度(vが多いほど詳細な情報が出力される)
-n :Worker数(プロセス数)の指定
ホストファイルの書式
オプション --hostfile で与えるホストファイルには、ホスト名もしくはIPアドレスとWoker数(プロセス数) を記述します。
hostname_or_ip 4 other_hostname third_hostname 2
ホストリストを与える
オプション --host につづけて Workerを実行するホスト名のリストを与えることができます。
SCOOP の使い方
SCOOP のコンポーネントを説明します。
Future(s): 呼び出し可能な非同期実行オブジェクトを抽象化したクラス
Broker: Futureをディスパッチする
Worker: Futureを実行する
Root: メインプログラムを実行している root Future
これらは次図のような構成となり、SCOOPのロゴはこのアーキテクチャを表現しています。
https://gyazo.com/e183bc3b953646fca662781a25a312a5
Map API
SCOOP では submit() と map() を主に使っていくことになります。
map()関数
Ptyhon の map() は複数のパラメタを1つの関数に適用するものです。
code: python
import random
# Mapを使わない場合
code: python
result = []
for i in data:
result.append(abs(i))
# Mapを使った場合
code: python
result = list(map(abs, data))
SCOOP の map() は入力と同じ順序で結果を返し続けるジェネレータです。
SCOOP でのスクリプトの実行方法
code: bash
$ python -m scoop scriptName.py
code: python
import random
from scoop import futures
if __name__ == '__main__':
# root worker だけがこのブロックを実行する
# 通常のシリアル処理
dataSerial = list(map(abs, data))
# SCOOPでの並列処理
dataParallel = list(futures.map(abs, data))
assert dataSerial == dataParallel
注意:
Worker はそれぞれ独立したプロセスであるため、オブジェクトはWorker間で共有されません。
オブジェクトに加えられた変更は、他のWorkerには伝わらないことに注意してください。
map_as_completed()関数
map() とほとんど同じですが、map_as_completed() はyield されて実行可能になったときに結果が返されます。
submit() 関数
submit() はFutureのインスタンスを返します。
MapReduce API
MapReduceは、クラスターシステムでの巨大なデータセットに対する分散コンピューティングを支援する目的で、Googleによって2004年に導入されたプログラミングモデルです。
https://gyazo.com/86227a59a8bc9f1883445a73e1de2a39
Map ステップ
マスターノードは、入力データを受け取り、それをより細かい単位に分割し、複数のワーカーノードに配置する。
受け取ったワーカーノードが、更に細かい単位に分割し、他の複数のワーカーノードに配置するという、より深い階層構造の分割を行うこともある。
そして、各ワーカーノードは、その細かい単位のデータを処理し、処理結果を、マスターノードへと返す。
Reduce ステップ
続いて、マスターノードが、Mapステップでの処理結果を集約し、目的としていた問題に対する答え(結果)を何らかの方法によって出力する。
mapreduce() 関数
Python の標準ライブラリ functool に reduce()があります。
map()の結果を累積的に適用します。 例えば次のコードは、"abcd" を返します。
code: pyton
import functools
code: pytohn
mport random
import operator
from scoop import futures
if __name__ == '__main__':
# root worker だけがこのブロックを実行する
# Python 標準のシリアル処理
serialSum = sum(map(abs, data))
# SCOOP での分散処理
parallelSum = futures.mapReduce(abs, operator.add, data)
assert serialSum == parallelSum
mapReduce()では、演算子だけでなく任意のreduce関数を渡すことができます。
ユーティリティー
shared モジュール
Worker間でオブジェクトを共有するための sharedモジュールがあり、setConst() と getConst() を使ってオブジェクトをやり取りします。
code: python
from scoop import futures, shared
def myParallelFunc(inValue):
myValue = shared.getConst('myValue')
return inValue + myValue
if __name__ == '__main__':
shared.setConst(myValue=5)
print(list(futures.map(myParallelFunc, range(10))))
注意
定数は、Worker全体のPoolで一度しか定義できません
loggerモジュール
scoop.logger を使用すると、時刻、メッセージを発行したWorker名、メッセージが発行されたモジュールなど、ログメッセージとともに有用な情報を出力することができます。
code: python
import scoop
scoop.logger.warn("This is a warning!")
SCOOPでは picklable でなければ動作しない
SCOOP はWorkerとの通信には pickle 化された情報を送信しているため、オブジェクトは picklable である必要があります。
プログラムの最上位レベルで宣言されている関数またはクラスのみがpicklableのオブジェクトです。 これはPythonのpickleモジュールの制限です。
次に、動作しいないmap()の呼び出しの例を示します。
code: python
from scoop import futures
class myClass(object):
@staticmethod
def myFunction(x):
return x
if __name__ == '__main__':
def mySecondFunction(x):
return x
# picklable でないので、どちらも動作しません。
モンテカルロ法による円周率の近似値計算
SCOOP の例題として、モンテカルロ法を使って円周率の近似値を求めてみます。
モンテカルロ法というのは、乱数を用いて何らかの値を推定するシミュレーションの方法です。
このモンテカルロ法を使って円周率の近似値を求めるアルゴリズムは次のようになります。
1. $ 1 \times 1 の正方形内にランダムに点を打つ(一様分布にしたがった乱数を使う)
2. 原点から距離が 1 以下なら 1 ポイント,1 より大きいなら 0 ポイント追加
3. これらの操作を N 回繰り返して総獲得ポイントを P とする
4. $ \dfrac {4P}{N}を円周率$ \piの近似値として得る
https://gyazo.com/06c82b60d21abf3afcffd2a773d55e30
code: python
from random import random
from math import sqrt
def pi_estimate(total_random_points=1000):
inside_circle = 0 # 内側のポイント数
# すべてのポイント数だけ繰り返す
for i in range(0, total_random_points):
# (0,0) 〜 (1,1) のランダムな座標を生成
x_squared, y_squared = random()**2, random()**2
# ポイントが内側のあれば inside_circle を増加させる
if sqrt(x_squared + y_squared) < 1.0:
inside_circle += 1
# inside / total_random_points = pi / 4
pi = (float(inside_circle) / total_random_points) * 4
return (pi)
code: python
print(pi_estimate(total))
3.052
3.128
3.1352
3.1572
試行回数が多くなれば、$ \pi に近似していくのがわかります。
numpy の rand() を使って書き直し、合わせてnumpyに定義されているπの定数値(numpy.pi) との誤差を計算させものが次になります。
実際には、統計的な手法でどの程度確からしいのかという評価が必要になります。
code: python
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import time as time
total_random_points = int(input("\nトータルのポイントの数を入力してください?\n>"))
# 計算開始時間を保存
start_time = time.time()
# 内側のポイント数をリセット
inside_circle = 0
# 座標 (x,y)の array を初期化
x_plot_array = np.empty(shape=(1,total_random_points))
y_plot_array = np.empty(shape=(1,total_random_points))
# ランダムな座標を生成して、内側にあるものをカウントする
for i in range(0, total_random_points):
# (0,0) 〜 (1,1) のランダムな座標を生成
x = np.random.rand()
x_plot_array = np.append(x_plot_array, x) y = np.random.rand()
y_plot_array = np.append(y_plot_array, y) # 2乗の値を計算
x_squared = x**2
y_squared = y**2
# その座標が内側にあれば inside_circle を増加させる
if np.sqrt(x_squared + y_squared) < 1.0:
inside_circle += 1
# πの推定値を計算
pi_approx = inside_circle / (i+1) * 4
print ("\n--------------")
print (f"\nπの推定値: {pi_approx}")
print (f"πの値との誤差: {pi_approx - np.pi}")
print (f"誤差率: (approx - exact)/exact*100: {(pi_approx - np.pi)/np.pi*100}%")
print (f"計算時間: {time.time() - start_time} seconds\n")
# 1/4 円を表示
random_points_plot = plt.scatter(x_plot_array,
y_plot_array,
color='blue', s=.1)
circle_plot = plt.Circle( ( 0, 0 ), 1,
color='red',
linewidth=2,
fill=False)
ax = plt.gca()
ax.cla()
ax.add_artist(random_points_plot)
ax.add_artist(circle_plot)
plt.show()
トータルのポイントの数を入力してください?
5000
--------------
πの推定値: 3.1432
πの値との誤差: 0.0016073464102071
誤差率: (approx - exact)/exact*100: 0.051163425289094656%
計算時間: 1.7082107067108154 seconds
SCOOPでの実装
code: python
from math import hypot
from random import random
from scoop import futures
from time import time
def test(tries):
return sum(hypot(random(), random()) < 1 for _ in range(tries))
def calcPi(workers, tries):
bt = time()
expr = futures.map(test, tries * workers) piValue = 4. * sum(expr) / float(workers * tries)
totalTime = time() - bt
print("pi = " + str(piValue))
print("total time: " + str(totalTime))
return piValue
if __name__ == "__main__":
dataPi = calcPi(5, 5000)
まとめ
SCOOPでは通信のためにオブジェクトがPickle化できる必要があることから、トップレベルに記述したコードしか分散処理できないという欠点があります。
それでも、SCOOPを使うと簡単に分散処理をすることができるのは有用です。
参考