mpi4pyを使ってみよう
MPIについて
MPI(Message Passing Interface)は、さまざまな並列コンピューターで機能するように設計された、標準化されたポータブルなメッセージパッシングシステムです。 この標準は、ライブラリルーチンの構文とセマンティクスを定義し、ユーザーがプログラミング言語(Fortran、C、またはC++など)でプログラムを記述できるようにします。
MPI標準のリリース履歴
この標準に準拠する実装は多數あります。
mpi4py について
MPI for Python (mpi4py) は、標準のMPI-2 C ++バインディングに基づいたメッセージパッシングへのオブジェクト指向アプローチを提供します。 このインターフェースは、C++用の標準MPI-2バインディングのMPI構文とセマンティクスをPythonに変換することに重点を置いて設計されました。 標準のC/C++ MPIバインディングを使用していたユーザーは、新しいインターフェイスの学習コストは非常に低く、この mpi4py モジュールを使用できるはずです。逆にいうと、MPIプログラムの手続きや方法論についての理解が乏しいようなPython ユーザには、なによりもまずMPIの知識の理解が重要になります。
インストールと設定
mpi4py は次のようにインストールすることができます。
code: bash
$ pip install mpi4py
pipでインストールする場合、コンパイル環境が必要になる場合があります。
conda でインストールする方が楽です。
code: bash
$ conda install mpi4py
次の形式のホストファイルを用意しておきます。
code: hostfile
localhost slots=4
このホストファイルは4CPUのローカルマシンで実行する場合です。
mpi4py の概要
MPIプログラムの基本は次のものがあります。
MPI_ ではじまるMPI関数やMPI定数を利用する
MPI_Init() と MPI_Finalize() で囲んだ中でMPI関数を呼び出せる
MPIプログラムで生成される各プロセスはランク(Rank)という値が設定される
ランク0はマスタープロセスとなる
C/C++ と mpi4py でのコードの記述の違いを見てましょう。
code: C/C++
int main(int argc, char **argv)
{
int n, myid, numproc, i;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_COMM_rank(MPI_COMM_WORLD, &myid);
printf("Hello World %d\n", myid);
MPI_Finalize();
}
code: mpi4py_hello.py
import sys
from mpi4py import MPI
size = MPI.COMM_WORLD.Get_size() # SPMD環境の規模を取得
rank = MPI.COMM_WORLD.Get_rank() # このプロセスのランク(番号)
name = MPI.Get_processor_name() # このプロセスのプロセス名
sys.stdout.write(
f"Hello, World! I am process {rank} of {size} on {name}\n"
)
mpi4py ではMPI_Init() と MPI_Finalize() を省略できることに注目してください。
mpi4py を理解するには、MPIのいくつかの概念を知る必要があります。 MPIを使用した並列プログラミングでは、相互に通信できるプロセスのグループである、いわゆるコミュニケーターが必要です。そのグループのプロセスを識別するために、各プロセスにはコミュニケーター内で一意の値となるランク(rank)が割り当てられます。また、プロセスの総数を知ることも理にかなっています。これは、コミュニケーターのサイズと呼ばれることがよくあります。
mpi4pyのMPI.COMM_WORLDは、Cで記述されたMPIプログラムのMPI_COMM_WORLDに対応していることがわかります。各プロセスのランクはコミュニケーターのget_rank()メソッドをよって取得され、コミュニケーターのサイズはget_size()メソッドによって取得されます。生成されるプロセスの名前はMPI.Get_processor_name()で取得しています。ランクとサイズ、名前を使って、各プロセスから "Hello World"のメッセージを出力できるようになります。
mpi4pyコードを並行して実行する通常の方法は、mpirunコマンドとpython3コマンドを使用することです。たとえば、次のようにコマンドラインで実行します。
code: bash
$ mpirun --host=hostfile -n 4 python3 hello.py
これは、"python3 hello.py" のコマンドを4つのプロセスで実行するということになります。
Pythonオブジェクトを送受信
Pythonオブジェクトを各プロセスで送受信するためには、Commクラスのsend()、recv()、bcast()など、すべて小文字のメソッドを使用します。 送信されるオブジェクトは送信のためのメソッドにパラメーターとして渡され、受信されたオブジェクトは単に戻り値となります。
isend()メソッドとirecv()メソッドはRequestインスタンスを返します。 これらのメソッドの完了は、Requestクラスのtest()メソッドとwait()メソッドを使用して管理できます。
recv()メソッドとirecv()メソッドには、内部メモリの割り当てを回避してメッセージを受信するために繰り返し使用できるバッファオブジェクトを渡すことができます。 このバッファは、送信されたメッセージを収容するのに十分な大きさである必要があります。 したがって、recv()またはirecv()に渡されるバッファーは、少なくとも、Pickle化されたデータがレシーバーに送信されるのと同じ長さである必要があります。
scatter()、gather()、allgather()、alltoall()のような集合的な呼び出しは、ルート要素、すべてのプロセスで単一の値、または一連のComm.sizeアトリビュートを期待しています。 戻り値は、単一の値、Comm.size要素のリスト、またはNoneを返します。
バッファオブジェクトの送受信
この場合、CommクラスのSend()、Recv()、Bcast()、Scatter()、Gather()のように、大文字で始まるメソッド名を使用する必要があります。
一般に、これらの呼び出しへのバッファー引数では、[data、MPI.DOUBLE]や[data、count、MPI.DOUBLE]のような、要素数が2,もしくは3の、リストかタプルで明示的に指定する必要があります。
前者は、データのバイトサイズをdataで、後者は、加えてcountでMPIデータ型を定義しています。
Scatterv()やGatherv()のようなベクトル集合通信操作の場合、バッファーの引数は[data、count、displ、datatype]として指定されます。ここで、countとdisplは整数値のシーケンスです。
NumPy配列とPEP-3118 バッファーの自動MPIデータ型検出がサポートされていますが、ベースとなるMPIの実装がサポートする、基本的なC型(すべてのC / C99ネイティブの符号付き/符号なし整数型および単一/倍精度の実数/複素数浮動小数点型)と一致するデータ型の可用性に限定されています。この場合、バッファを提供するオブジェクトをバッファの引数として直接渡すことができ、countとMPIデータ型は推定されます。 補足説明:C型配列とF型配列
科学技術計算で良く使用されるプログラム言語に、FOTRANがあります。
このFOTRANの配列が実際のメモリ上にデータを配置する方法が、C言語とは異なります。FOTRAN形式(F型)配列、C形式(C型)配列と言われます。
https://gyazo.com/64068d1d10c612f26b30b1c508700241
出典: 東京大学理学部地球惑星物理学科 Fortran演習 (地球惑星物理学演習)
MPIプログラムでの通信パターン
MPIプログラムでの通信パターンにはつぎのものがあります。
send/recv
broadcast
scatter/gather/reduce
Send/Recv通信
Send/Recv通信は、ポイント・ツー・ポイント(Point-to-Point: P2P)通信とも言われ、2端点間で通信を行うことを表しています。
Pythonオブジェクト間のP2P通信のために、mpi4pyはMPIのメソッドと同様のsend()メソッドとrecv()メソッドを提供します。 マスタープロセス(Rank=0)とワーカープロセス(すべてRank> 0)の間でPython辞書オブジェクトを渡す(通常は「通信」と呼ばれます)ためのコードの例を以下に示します。
code: mpi4py_send_rev.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# マスタープロセス
if rank == 0:
data = {'x': 1, 'y': 2.0}
# マスタープロセスは、すべてのワーカープロセスのランクを通して、
# ワーカープロセスにデータを送信する
for i in range(1, size):
comm.send(data, dest=i, tag=i)
print('Process {} sent data:'.format(rank), data)
# ワーカープロセス
else:
# 各ワーカープロセスはマスタープロセスからデータを受け取る
data = comm.recv(source=0, tag=rank)
print(f'Process {rank} received data: {data}')
この例では、辞書オブジェクトのデータがマスタープロセスから各ワーカープロセスに送信されます。 ランクの値をチェックして、プロセスがマスタープロセスであるかどうかを判断しておます。 forループはマスタープロセスに対してのみ実行され、1から開始するiはワーカープロセスのランクを通過します。 また、send()メソッドとrecv()メソッドでtag引数を使用して、2つのプロセス間に複数の通信がある場合にメッセージを区別することもできます。 4つのプロセスで例を実行した場合の出力は次のとおりです。
ノンブロッキング通信
ただし、上記の例ではブロッキング通信方式を使用しているため、通信が完了するまでコードの実行は続行されないことに注意してください。 このブロッキング動作は、並列プログラミングでは必ずしも望ましいとは限りません。 場合によっては、ノンブロッキング通信方式を使用することが有益です。 以下のコードに示すように、isend()とirecv()メソッドは、Requestオブジェクトをすぐに返す非ブロッキングメソッドであり、wait()メソッドを使用して通信の完了を管理できます。 必要に応じて、comm.isend()とreq.wait()の間でいくつかの計算を実行して、通信と計算をオーバーラップさせることで効率を上げることができます。
code:mpi4py_send_recv_nonblocking.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# マスタープロセス
if rank == 0:
data = {'x': 1, 'y': 2.0}
# マスタープロセスは、すべてのワーカープロセスのランクを通して、
# ワーカープロセスにデータを送信する
for i in range(1, size):
req = comm.isend(data, dest=i, tag=i)
req.wait()
print('Process {} sent data:'.format(rank), data)
# ワーカープロセス
else:
# 各ワーカープロセスはマスタープロセスからデータを受け取る
req = comm.recv(source=0, tag=rank)
data = req.wait()
print(f'Process {rank} received data: {data}')
集合通信
bcast(): ブロードキャスト
並列プログラミングでは、集合通信(collective communication)と呼ばれるものを実行すると便利なことがよくあります。bcast()は変数を受け取り、その正確なコピーをコミュニケーター上のすべてのプロセスに送信します。 次図はMPI TUtorial からのもので、bcast()の機能を理解しやすく図示したものです。 https://gyazo.com/27877a2d4ec418bf345e3573cc00c3f9
たとえば、Pythonオブジェクトをマスタープロセスからすべてのワーカープロセスにブロードキャストします。 以下のサンプルコードは、bcast()メソッドを使用してNumpy配列をブロードキャストします。
code: mpi4py_broadcast.py
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
data = np.arange(4.0)
else:
data = None
data = comm.bcast(data, root=0)
if rank == 0:
print(f'Process {rank} broadcast data: {data}')
else:
print(f'Process {rank} received data: {data}')
code: bash
% mpirun --hostfile hostfile -n 4 python mpi4py_broadcasst.py
NumPy array のブロードキャスト
code: mpi4py_numpy_bcast.py
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = np.arange(100, dtype='i')
else:
data = np.empty(100, dtype='i')
comm.Bcast(data, root=0)
for i in range(100):
scatter(): 分散
Scatterは配列を取得し、その連続するセクションをコミュニケーターのランク全体に分散します。
つまり、タスクを分割してサブタスクをプロセスに分散するすることができます。 ただし、プロセッサの数より多くの要素を配布することはできないことに注意してください。
https://gyazo.com/5c5f6805b625bb6f610c9c04704393e3
大きなリストまたは配列がある場合は、scatter()メソッドを呼び出す前にリストまたは配列のスライスを作成する必要があります。 以下のコードは、Numpy配列を分散する例です。これは、scatter()メソッドが呼び出される前に配列スライスのリストに変換されます。
code: mpi4py_scatter.py
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if rank == 0:
data = np.arange(15.0)
# 各サブタスクのサイズを決定
ave, res = divmod(data.size, nprocs)
# 各サブタスクの開始インデックスと終了インデックスを決定
starts = [sum(counts:p) for p in range(nprocs)] ends = [sum(counts:p+1) for p in range(nprocs)] # データを配列のリストに変換
data = [data[startsp:endsp] for p in range(nprocs)] else:
data = None
data = comm.scatter(data, root=0)
print(f'Process {rank} has data: {data}')
code: bash
$ mpirun --hostfile hostfile -n 4 python mpi4py_scatter.py
合計15個の要素があるため、最初の3つのプロセスのそれぞれが4つの要素を受け取り、最後のプロセスが3つの要素を受け取ったことがわかります。
NumPy array を 分散
code: mpi4py_numpy_scatter.py
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
sendbuf = None
if rank == 0:
sendbuf.T:,: = range(size) recvbuf = np.empty(100, dtype='i')
comm.Scatter(sendbuf, recvbuf, root=0)
assert np.allclose(recvbuf, rank)
gather(): 収集
gather()は、scatter()とは逆になります。 各プロセスに要素がある場合は、gather()を使用して、マスタープロセスの要素のリストにそれらを収集できます。
https://gyazo.com/d782e16cd108f1ad7e14a754a3ab069f
以下のサンプルコードは、スキャッターとギャザーを使用して$ \pi を並列で計算します。(参考: "プロセスベースの並列処理" サンプル3: CPUバウンドの例 pi_multi.py) code: mpi4py_gather.py
from mpi4py import MPI
import time
import math
t0 = time.time()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
nsteps = 10000000
dx = 1.0 / nsteps
if rank == 0:
# 各サブタスクのサイズを決定
ave, res = divmod(nsteps, nprocs)
# 各サブタスクの開始インデックスと終了インデックスを決定
starts = [sum(counts:p) for p in range(nprocs)] ends = [sum(counts:p+1) for p in range(nprocs)] # 開始インデックスと終了インデックスをデータに保存
data = [(startsp, endsp) for p in range(nprocs)] else:
data = None
data = comm.scatter(data, root=0)
# 各プロセスに分配された円周率の計算を実行
partial_pi = 0.0
for i in range(data0, data1): x = (i + 0.5) * dx
partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx
partial_pi = comm.gather(partial_pi, root=0)
if rank == 0:
pi = sum(partial_pi)
walltime = time.time() - t0
print(f'pi computed in {walltime:.3f} sec')
err_pi = abs(pi - math.pi)
print(f'{pi}: error is {err_pi}')
code: bash
$ mpirun --hostfile hostfile -n 4 python mpi4py_gather.py
pi computed in 1.437 sec
3.1415926535896697: error is 1.234568003383174e-13
reduce(): 縮約
MPI reduce 操作は、各プロセスの配列から値を取り込み、マスタープロセスでそれらを単一の結果に削減します。 これは基本的に、各プロセスからマスタープロセスへのやや複雑な送信コマンドを実行し、マスタープロセスにreduce 操作を実行させるようなものです。 MPI reduceは、これらすべてを1つの簡潔なコマンドで実行することができます。
https://gyazo.com/1e915529fd4f2fadaae4e087545f4876
要素をリストに収集するgather()メソッドに加えて、reduce()メソッドを使用して結果を収集することもできます。
code: mpi4py_reduce.py
from mpi4py import MPI
import time
import math
t0 = time.time()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
nsteps = 10000000
dx = 1.0 / nsteps
if rank == 0:
# 各サブタスクのサイズを決定
ave, res = divmod(nsteps, nprocs)
# 各サブタスクの開始インデックスと終了インデックスを決定
starts = [sum(counts:p) for p in range(nprocs)] ends = [sum(counts:p+1) for p in range(nprocs)] # 開始インデックスと終了インデックスをデータに保存
data = [(startsp, endsp) for p in range(nprocs)] else:
data = None
data = comm.scatter(data, root=0)
# 各プロセスに分配された円周率の計算を実行
partial_pi = 0.0
for i in range(data0, data1): x = (i + 0.5) * dx
partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx
pi = comm.reduce(partial_pi, op=MPI.SUM, root=0)
if rank == 0:
walltime = time.time() - t0
print(f'pi computed in {walltime:.3f} sec')
err_pi = abs(pi - math.pi)
print(f'{pi}: error is {err_pi}')
ここで、op=MPI.SUM を使って、すべてのプロセスからpartial_piの合計を取得しています。
NumPy array を収集
code: mpi4py_numpy_gather.py
rom mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
sendbuf = np.zeros(100, dtype='i') + rank
recvbuf = None
if rank == 0:
comm.Gather(sendbuf, recvbuf, root=0)
if rank == 0:
for i in range(size):
assert np.allclose(recvbufi,:, i) 並列での行列 x 配列
code: mpi4py_numpy_matrix.py
from mpi4py import MPI
import numpy
def matvec(comm, A, x):
m = A.shape0 # local rows p = comm.Get_size()
xg = numpy.zeros(m*p, dtype='d')
y = numpy.dot(A, xg)
return y
MPI-IO
MPIプログラムで複数プロセスからファイルに書き出す場合は、これまでの単純な方法では、I/Oが集中してしまうことでI/O待ちが発生して、せっかく苦労して並列化を行っているのに性能を引き出すことができなくなります。特にHPC計算で性能を出すためには、バックエンドにある並列ファイルシステムの特性について理解しておかなければなりません。
MPI 標準に含まれるファイル I/O のための APIで、これを使用すると、それぞれのプロセスがファイルのどの位置からどの位置まで書き込むかを与えて、分散してI/Oを行うことができます。
現在、主なMPI-IOはROMIO と呼ばれる実装がベースになっています。 HDF5や Parallel netCDF などのアプリケーション向けのI/Oライブラリから、MPI-IOで並列I/Oを行うこともできます。
MPI-IOの基本コンセプト
ファイルハンドル(fh): ファイルへのアクセスに使用されるデータ構造
ファイルポインタ(fp): 読み取りまたは書き込みを行うファイル内の位置
すべてのプロセスで個別にすることも、プロセス間で共有することもできます
ファイルハンドルを介してアクセス
ファイルビュー(fv): 処理するために表示される並列ファイルの一部
ファイルへの効率的な非連続アクセスを可能にします
集団的I/O と独立I/O:ファイルI/Oの際のデータ書き出しパターン
集団的I/O(Collective I/O): MPIはプロセスのリード/ライトを調整します
独立I/O(Independent I/O): MPIはファイルI/Oに何も調整はしない
基本の操作
ファイルのオープン/クローズ/削除
ファイルハンドラは MPI.File.Open()で返されるオブジェクトです。
クローズは
fh = MPI.File.Open(comm, filename, amode=MODE_RDONLY, info=INFO_NULL)
comm: コミュニケータ
filename: オープンするファイル名/ファイルパス
amode: アクセスモード (後述)
info: I/O最適化のためのヒント デフォルトは MPI.INFO_NULL
戻り値: ファイルハンドル
fh.Close()
fh.Delete()
ファイルポインタ
ファイルポインタは、それぞれのMPIプロセスは個別に保持します。
共有ファイルポインタは、それぞれのMPIプロセスで共有されます。
fh.Seek(offset, whence=SEEK_SET)
fh.Seek_share(offset, whence=SEEK_SET)
offset: ポインタを移動するバイトサイズ
whence:
MPI.SEEK_SET:ポインターをオフセットの位置にセット
MPI.SEEK_CUR: ポインターを現在の位置からoffsetだけ移動する
MPI.SEEK_END: ポインターをファイルの末尾からoffsetの位置にセットする
リード
実行後はファイルポインタは移動します。スレッドセーフではありません。
fh.Read(buf, status)
buf: データを格納するバッファ(配列など)
status: MPI.Recv()のステータスと同じ
読み取るデータ量はMPI.Get_count()によって決まります
他にも多数のリード関連メソッドがあります。
Read_all()、Read_all_begin()、Read_all_end()、Read_at()、Read_at_all()、Read_at_all_begin()、
Read_at_all_end()、Read_ordered()、Read_ordered_begin()、Read_ordered_end()、Read_shared()
ライト
実行後はファイルポインタは移動します。スレッドセーフではありません。
fh.Write(buf, status)
buf: データを格納するバッファ(配列など)
status: MPI.Recv()のステータスと同じ
読み取るデータ量はMPI.Get_count()によって決まります
他にも多数のWrite関連メソッドがあります。
Write_all()、Write_all_begin()、Write_all_end()、Write_at()、Write_at_all()、Write_at_all_begin()、
Write_at_all_end()、Write_ordered()、Write_ordered_begin()、Write_ordered_end()、Write_shared()
モード
MPI.MODE_RDONLY: 読込のみ可能
MPI.MODE_RDWR: 読込みと書き込みの両方可能
MPI.MODE_WRONLY: 書込みのみ可能
MPI.MODE_CREATE: ファイルが無い場合、新規作成
MPI.MODE_EXCL: 既にファイルがある場合にエラーを返す
MPI.MODE_DELETE_ON_CLOSE: ファイルを閉じる際に消去
MPI.MODE_UNIQUE_OPEN: 同時にファイルをオープンしない
MPI.MODE_SEQUENTIAL: 逐次的なファイルのオープン
MPI.MODE_APPEND: 全てのファイルポインタをファイル終端にセット
ファイルビュー
デフォルトでは、ファイルはバイトで構成されているものとして扱われ、プロセスはファイル内の任意のバイトにアクセス(リード/ライト)できます。ファイルビューは、ファイルのどの部分をI/O対象にするかを定義します。
fh.Set_view(disp=0, etype=None, filetype=None, datarep=None, info=INFO_NULL)
displacement:ファイルの最初からスキップするバイト数
etype:アクセスされるデータのタイプ、オフセットの単位を定義
filetype:プロセスがI/O対象にするファイルの一部
datarep: データ表現(移植性のために調整可能)
デフォルトはNoneで、メモリと同じ形式で保存
info: I/O最適化のためのヒント デフォルトは MPI.INFO_NULL
サンプル
NumPy array を並列で書き出す
code: mpi4py_numpy_io.py
from mpi4py import MPI
import numpy as np
amode = MPI.MODE_WRONLY|MPI.MODE_CREATE
comm = MPI.COMM_WORLD
fh = MPI.File.Open(comm, "./datafile.contig", amode)
buffer = np.empty(10, dtype=np.int)
buffer: = comm.Get_rank() offset = comm.Get_rank()*buffer.nbytes
fh.Write_at_all(offset, buffer)
fh.Close()
NumPy array とデータを非連続に並列で書き出す
code: mpi4py_numpay_noncontiguous_io.py
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
amode = MPI.MODE_WRONLY|MPI.MODE_CREATE
fh = MPI.File.Open(comm, "./datafile.noncontig", amode)
item_count = 10
buffer = np.empty(item_count, dtype='i')
filetype = MPI.INT.Create_vector(item_count, 1, size)
filetype.Commit()
displacement = MPI.INT.Get_size()*rank
fh.Set_view(displacement, filetype=filetype)
fh.Write_all(buffer)
filetype.Free()
fh.Close()
SWIGとの連携
SWIG は C/C++、FORTRAN、Ruby、Perl など他の言語で記述したコードを相互に取り込むための仕組みを提供しています。よく使われるケースは、数値計算などの求解するときに速度の問題でアルゴリズムをC/C++、FORTRANで実装したり、過去の資産としてC/C++、FORTRANで記述されたライブラリがあり、それを呼び出したいといったものがあります。
今、C言語で記述されたコードがあるとします。
code: helloworld.c
/* file: helloworld.c */
void sayhello(MPI_Comm comm)
{
int size, rank;
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
printf("Hello, World! "
"I am process %d of %d.\n",
rank, size);
}
SWIGのインタフェースファイルを次のように記述します。
code: helloworld.i
// file: helloworld.i
%module helloworld
%{
}%
%include mpi4py/mpi4py.i
%mpi4py_typemap(Comm, MPI_Comm);
void sayhello(MPI_Comm comm);
次のようにして、Pythonから利用することができます。
code: python
>> from mpi4py import MPI
>> import helloworld
>> helloworld.sayhello(MPI.COMM_WORLD)
Hello, World! I am process 0 of 1.
mpi4py.futures
mpi4py.futures は、プロセス間通信にMPIを使用して、ワーカープロセスのプールで呼び出し可能オブジェクトを非同期的に実行するための高レベルのインターフェイスを提供します。
mpi4py.futuresパッケージは、Python標準ライブラリのconcurrent.futures に基づいています。 より正確には、mpi4py.futuresは、抽象クラスExecutorの具体的な実装としてMPIPoolExecutorクラスを提供します。 submit()インターフェースは、呼び出し可能オブジェクトが非同期で実行されるようにスケジュールし、呼び出し可能オブジェクトの実行を表すFutureオブジェクトを返します。 Futureインスタンスは、呼び出し結果または例外について照会できます。 Futureインスタンスのセットは、wait()関数とas_completed()関数に渡すことができます。 MPIPoolExecutor
MPIPoolExecutorクラスは、MPIプロセスのプールを使用して、呼び出しを非同期で実行します。 別々のプロセスで計算を実行することにより、GILを回避できますが、Pickle化されたオブジェクトのみを実行して返すことができます。 __main__モジュールはワーカープロセスによってインポート可能である必要があるため、MPIPoolExecutorインスタンスはインタラクティブインタープリターで機能しない可能性があります。
MPIPoolExecutorは、MPI-2標準で導入された動的プロセス管理機能を利用します。 特に、MPI.COMM_SELF()のMPI.Intracomm.Spawn()メソッドは、Pythonインタープリターを実行する新しいワーカー(または子)プロセスを生成するためにマスタープロセスで使用されます。 マスタープロセスは、個別のスレッド(MPIPoolExecutorインスタンスごとに1つ)を使用して、ワーカーと通信します。 ワーカープロセスは、完了のシグナルが送信されるまで、メインスレッド(および唯一のスレッド)でタスクの実行を提供します。
ワーカープロセスは、__main__モジュールで定義され、マスタープロセスから送信された呼び出し可能オブジェクトを非Pickle化するために、メインスクリプトをインポートする必要があります。 さらに、呼び出し可能オブジェクトは他のグローバル変数にアクセスする必要がある場合があります。 ワーカープロセスで、mpi4py.futures は__worker__名前空間の下で(runpyモジュールを使用して)メインスクリプトコードを実行し、__main__モジュールを定義します。 __main__モジュールと__worker__モジュールがsys.modules(マスタープロセスとワーカープロセスの両方)に追加され、適切なPickle化/非Pickle化が確保されます。
注意
ワーカーでの最初のインポートフェーズでは、メインスクリプトは新しいMPIPoolExecutorインスタンスを作成して使用することはできません。 そうしないと、各ワーカーが新しいワーカープールを生成しようとし、無限の再帰が発生するためです。 mpi4py.futuresは、新しいワーカーを生成するこのような再帰的な試行を検出し、MPI実行環境を中止します。
メインスクリプトコードは__worker__名前空間で実行されるため、新しいワーカー生成の再帰を回避する最も簡単な方法は、メインスクリプトでif __name__ == '__main__':のイディオムを使用するようにします。
MPICommExecutor
レガシーなMPI-1実装(および一部のベンダーMPI-2実装)は、MPI-2標準で導入された動的プロセス管理機能をサポートしていません。さらに、スーパーコンピューティング機能のジョブスケジューラとバッチシステムは、MPI_Comm_spawn()を使用するアプリケーションで別の問題を引き起こす可能性があります。
これらの問題を念頭に置いて、mpi4py.futuresは、MPI-1呼び出しのみを必要とする、より伝統的なSPMDのような使用パターンをサポートします。 Pythonアプリケーションは、通常の方法で起動されます。たとえば、mpiexecコマンドを使用します。 Pythonコードは、MPICommExecutorコンテキストマネージャーをまとめて呼び出して、MPIコミュニケーター内のMPIプロセスのセットを1つのマスタープロセスと多くのワーカープロセスに分割する必要があります。マスタープロセスは、MPIPoolExecutorインスタンスにアクセスしてタスクを送信します。一方、ワーカープロセスは、マスターから送信されたタスクを実行するために、異なる実行パスとチームアップに従います。
従来のMPI-1または部分的なMPI-2実装における動的プロセス管理機能の欠如を軽減することに加えて、
MPICommExecutorコンテキストマネージャーは、mpi4py.futuresパッケージで利用可能なシンプルなタスクベースのマスター/ワーカーアプローチを利用することをいとわない古典的なMPIベースのPythonアプリケーションで役立つ場合があります。
MPICommExecutorコンテキストマネージャーは、従来のMPI-1または部分的なMPI-2実装における動的プロセス管理機能の欠如を軽減することに加えて、mpi4py.futuresパッケージで利用可能なシンプルなタスクベースのマスター/ワーカーアプローチを利用する旧方式のMPIベースのPythonアプリケーションで役立つ場合があります。
mpiexecコマンド
MPI実装での動的プロセス管理機能のサポートの欠如に関連する問題を想定して、mpi4py.futuresは、Pythonコードがmpi4pyのコマンドラインから実行される代替の使用パターンをサポートします。
もっと簡単に言うと、mpi4py.futures は次の方法で呼び出すことができます。
code: bash
$ mpiexec -n numprocs python -m mpi4py.futures pyfile arg..。 $ mpiexec -n numprocs python -m mpi4py.futures -m mod arg..。 $ mpiexec -n numprocs python -m mpi4py.futures -c cmd arg .. .. $ mpiexec -n numprocs python -m mpi4py.futures- arg..。 メインスクリプトの実行を開始する前に、mpi4py.futuresはMPI.COMM_WORLDを1つのマスター(MPI.COMM_WORLDのランク0のプロセス)と16のワーカーに分割し、MPIインターコミュニケーターを介してそれらを接続します。 その後、マスタープロセスはユーザースクリプトコードの実行を続行し、最終的にはタスクを送信するためのMPIPoolExecutorインスタンスを作成します。 一方、ワーカープロセスは、マスターにサービスを提供するために別の実行パスに従います。 マスターでメインスクリプトが正常に終了すると、MPI実行環境全体が正常に存在するようになります。 メインスクリプトで未処理の例外が発生した場合、マスタープロセスはMPI.COMM_WORLD.Abort(1)を呼び出して、デッドロックを防ぎ、MPI実行環境全体を強制的に終了します。
mpi4py.futures の例
次のmpi4py_julia.pyスクリプトは、ジュリア集合を計算し、PGM形式でイメージをディスクに書き出しします。 コードは、mpi4py.futuresパッケージからMPIPoolExecutorをインポートすることから始まります。 次に、いくつかのグローバル定数と関数は、ジュリア集合の計算を実装します。 __name__ == '__main__':イディオムの場合、計算は標準で保護されます。 画像は、マップメソッドを使用してこれらすべてのタスクを一度に送信するスキャンライン全体によって計算されます。 結果のイテレータは、タスクが完了すると、スキャンラインを順番に生成します。 最後に、各スキャンラインがディスクにダンプされます。
code: mpi4py_julia.py
from mpi4py.futures import MPIPoolExecutor
x0, x1, w = -2.0, +2.0, 640*2
y0, y1, h = -1.5, +1.5, 480*2
dx = (x1 - x0) / w
dy = (y1 - y0) / h
c = complex(0, 0.65)
def julia(x, y):
z = complex(x, y)
n = 255
while abs(z) < 3 and n > 1:
z = z**2 + c
n -= 1
return n
def julia_line(k):
line = bytearray(w)
y = y1 - k * dy
for j in range(w):
x = x0 + j * dx
return line
if __name__ == '__main__':
with MPIPoolExecutor() as executor:
image = executor.map(julia_line, range(h))
with open('julia.pgm', 'wb') as f:
f.write(b'P5 %d %d %d\n' % (w, h, 255))
for line in image:
f.write(line)
mpiexec コマンドを使って、このスクリプトを実行します。
code: bash
$ mpiexec -n 1 -usize 17 python mpi4py_julia.py
mpiexecコマンドは、Pythonインタープリターを実行してメインスクリプトを実行する単一のMPIプロセス(マスター)を起動します。 必要に応じて、mpi4py.futuresは16個の追加のMPIプロセス(子プロセス)を生成して、ワーカーのプールを動的に割り当てます。 マスターは子プロセスにタスクを提出し、結果を待ちます。 子プロセスはタスクを受け取り、それらを実行して、結果をマスターに送り返します。
オプション -usizeを使用したこのmpiexec呼び出しの例は、バックエンドMPI実装がHydra プロセスマネージャーを使用したMPICH派生物であることを前提としています。 オプションの代わりに環境変数MPIEXEC_UNIVERSE_SIZEを設定しても、MPIユニバースサイズを指定できます。 OpenMPIの実装では、OMPI_UNIVERSE_SIZE環境変数を正の整数に設定することでMPIユニバースサイズを指定できます。 目的のMPIユニバースサイズを指定する方法については、実際のMPI実装および/またはバッチシステムのドキュメントを確認してください。
あるいは、ユーザーは、より伝統的な方法でスクリプトを実行することを決定する場合があります。つまり、すべてのMPIプロセスが一度に開始されます。 ユーザースクリプトは、mpi4py.futuresのコマンドライン制御の下で実行され、Python実行可能ファイルに-mオプションを渡します。
code: bash
$ mpiexec -n 17 python -m mpi4py.futures mpi4py_julia.py
前に説明したように、17のプロセスは1つのマスターと16のワーカーに分割されます。 マスタープロセスはメインスクリプトを実行し、ワーカーはマスターから送信されたタスクを実行します。
mpi4py.run
バージョン3.0.0の新機能。
インポート時に、mpi4pyはMPI_Init_thread()を呼び出してMPI実行環境を初期化し、Pythonプロセスが終了する直前にMPI_Finalize()を自動的に呼び出すための終了フックをインストールします。さらに、mpi4pyは、デフォルトのMPI.ERRORS_ARE_FATALエラーハンドラーをオーバーライドして、Python例外でMPIエラーを変換できるMPI.ERRORS_RETURNを優先します。標準のMPI動作から逸脱してはいますが、動的なPythonプログラミング環境では非常に便利です。 mpi4pyを使用するサードパーティのコードは、mpi4pyからMPIをインポートし、面倒な初期化/終了処理なしでMPI呼び出しを実行できます。 MPIエラーは、Python例外に自動的に変換されると、一般的な
try…except…finallyで処理できます。未処理のMPI例外は、ソースコード内の問題の特定に役立つトレースバックを出力します。
残念ながら、自動MPIファイナライズと未処理の例外の相互作用により、デッドロックが発生する可能性があります。バッチ処理で実行すると、これらのデッドロックによってラップトップのバッテリーが消耗したり、スーパーコンピューティング施設で貴重な割り当て時間が消費されたりします。
次のPythonコードのスニペットについて考えてみます。このコードが標準のPythonスクリプトファイルに保存され、2つ以上のプロセスでmpiexecを使用して実行されると想定します。
code: python
from mpi4py import MPI
assert MPI.COMM_WORLD.Get_size() > 1
rank = MPI.COMM_WORLD.Get_rank()
if rank == 0:
1/0
MPI.COMM_WORLD.send(None, dest=1, tag=42)
elif rank == 1:
MPI.COMM_WORLD.recv(source=0, tag=42)
プロセス0は、プロセス1への送信呼び出しを実行する前にZeroDivisionError例外を発生させます。例外が処理されないため、プロセス0で実行されているPythonインタープリターはゼロ以外のステータスで終了します。ただし、mpi4pyが終了前にMPI_Finalize()を呼び出すファイナライザーフックをインストールしたため、プロセス0は、他のプロセスもMPI_Finalize()呼び出しに入るのを待つことをブロックします。一方、プロセス1は、プロセス0からメッセージが到着するのを待つことをブロックするため、MPI_Finalize()に到達することはありません。 MPI実行環境全体が修復不可能な状態でデッドロック状態になっています。
この問題を軽減するために、mpi4pyは、-mフラグの使用して、runpyモジュールで実装された単純な代替コマンドライン実行メカニズムを提供します。この機能を使用するには、Pythonインタープリターを呼び出すコマンドラインで-mmpi4pyを渡してPythonコードを実行する必要があります。未処理の例外の場合、ファイナライザフックはMPI_COMM_WORLDコミュニケータでMPI_Abort()を呼び出すため、MPI実行環境を効果的に中止します。
プロセスが強制的に中止されると、オープン中のファイルなどのリソースはクリーンアップされず、登録されたファイナライザー(atexitモジュール、Python C/API関数Py_AtExit()、またはC標準ライブラリ関数atexit()のいずれかを使用)は実行されません。したがって、実行の中止は、プロセスの終了を保証する非常に強引な方法です。 ただし、MPIには、デッドロック状態から回復するための他のメカニズムはありません。
インターフェイスオプション
コマンドラインでPythonコードを実行するためのオプション-m mpi4pyの使用は、Pythonインタープリターでの使用に似ています。
code: bash
$ mpiexec -n numprocs python -m mpi4py pyfile arg... $ mpiexec -n numprocs python -m mpi4py -m mod arg... $ mpiexec -n numprocs python -m mpi4py -c cmd arg... $ mpiexec -n numprocs python -m mpi4py- arg... <pyfile>: pyfileに含まれているPythonコードを実行します。
これは、Pythonファイル、__main__.py ファイルを含むディレクトリ、または__main__.pyファイルを含むzipファイルのいずれかを参照するファイルシステムパスです。
-m <mod>: <mod>で与えたモジュールをsys.path から検索して、その内容を実行します。
-c <cmd>: <cmd>で与えたPythonコードをコマンドとして実行します。
-: ハイフォン記号(-)は、標準入力(sys.stdin)からコマンドを読み込みます。
まとめ
mpi4py を利用するとMPIを利用した並列計算をPython で簡単に記述、実行できるようになります。
ただし、mpi4py は MPI をラップしているだけなので、実際のプログラミングではMPIの知識が必要になってしまいます。また、実際の動作ではバックエンドのMPI実装の知識も必要になります。
それでも、マルチノードでの並列処理を簡単にできることは特筆するべきだと考えています。
参考
ROMIO: A High-Performance, Portable MPI-IO Implementation