joblibを使ってみよう
https://gyazo.com/0b29c28bab26d0302b01de863091c93a
joblib について
joblib は、Pythonで軽量のパイプライン処理を提供するためのツールセットです。 次のような機能があります。
簡単でシンプルな並列化処理
実行処理のトラッキングとロギング
効率的に配列バッファ処理がされるため大きなnumpy配列が高速
タスク出力のディスクキャッシュを再遅延評価
zlib圧縮してのオブジェクトのpickle/非pickle化
オブジェクト階層をバイトストリームへ簡単に変換/復元
インストール
joblib は次のようにインストールします。
code: bash
$ pip install joblib
Joblib が提供しているクラスとヘルパー関数
Joblib が提供しているクラスおよびヘルパー関数には次のものがあります。
dump()、load():多くの計算結果を圧縮して永続化して再利用する、
Memory:計算結果を一時保存することで重複した処理を削減する
Parallel:同時に複数の処理を実行する
データの永続化
joblib.dump()およびjoblib.load()は、大きなデータを含む任意のPythonオブジェクト、特に大きなnumpy配列を効率的に処理するためのpickleの代替手段を提供します。この機能が必要になるケースは次のものです。
アウトオブコア計算:すべての配列データをメモリ上に保持できないときにディスクに書き出して、部分体に処理する
チェックポイント&リスタート:与えられている計算時間内で処理しきれないときに、途中結果をディスクに書き出しておく。再開するときはディスクからチェックポイントデータを読み出して途中から継続して計算する。
警告
joblib.dump()とjoblib.load()はPythonのpickleシリアライズモデルに基づいています。つまり、joblib.load()でシリアライズされたオブジェクトをロードするときに、任意のPythonコードが実行できることになります。つまり,joblib.load()でシリアライズされたオブジェクトをロードすると,任意のPythonコードが実行される可能性があります。したがって,joblib.load() は信頼できないソースからオブジェクトをロードするためには決して使用してはいけません。
注意点
Python 3.8以降およびnumpy 1.16以降では、PEP 574で導入されたpickleプロトコル5が、標準ライブラリを使用して大規模なデータバッファの効率的なシリアライズおよびデシリアライズをネイティブにサポートしています。
code: python
pickle.dump(large_object, fileobj, protocol=5)
観覧な使用方法
joblib の dumo()とload()の簡単な使用方法を次に示します。
code: python
n 2: # %load 01_persistent.py ...: import numpy as np
...: import joblib
...:
...: filename = '/tmp/test.joblib'
...:
...: orig_data = [('a', 1, 2, 3), ('b', np.arange(10))] ...:
...: joblib.dump(orig_data, filename)
...:
...: load_data = joblib.load(filename)
...:
...: print(f'Orig: {orig_data}')
...: print(f'Load: {load_data}')
...:
Orig: [('a', 1, 2, 3), ('b', array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))] Load: [('a', 1, 2, 3), ('b', array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))] ファイルオブジェクトの永続性
ファイル名の代わりに、joblib.dump() と joblib.load() 関数はファイル・オブジェクトを受け入れます。
code: python
In 2: # %load 02_file_obj.py ...: import numpy as np
...: import joblib
...:
...: filename = '/tmp/test.joblib'
...:
...: orig_data = [('a', 1, 2, 3), ('b', np.arange(10))] ...:
...: with open(filename, 'wb') as fo:
...: joblib.dump(orig_data, fo)
...:
...: with open(filename, 'rb') as fo:
...: load_data = joblib.load(fo)
...:
...: print(f'Orig: {orig_data}')
...: print(f'Load: {load_data}')
...:
Orig: [('a', 1, 2, 3), ('b', array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))] Load: [('a', 1, 2, 3), ('b', array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))] 圧縮されたjoblibのピクル化データ
joblib.dump()の引数にcompress=True を与えると、ディスク上のスペースを節約することができます。
code: python
In 2: # %load 03_compress.py ...: import numpy as np
...: import joblib
...:
...: filename = '/tmp/test.joblib'
...:
...: orig_data = [('a', 1, 2, 3), ('b', np.arange(10))] ...:
...: joblib.dump(orig_data, filename + '.compressed', compress=True)
...:
...: load_data = joblib.load(filename)
...:
...: print(f'Orig: {orig_data}')
...: print(f'Load: {load_data}')
...:
Orig: [('a', 1, 2, 3), ('b', array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))] Load: [('a', 1, 2, 3), ('b', array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))] ファイル名の拡張子が、サポートされている圧縮方法のいずれかに対応している場合、その圧縮方法が自動的に使用されます。
code: python
In 2: # %load 04_auto_compress.py ...: import numpy as np
...: import joblib
...:
...: filename = '/tmp/test.joblib.z'
...:
...: orig_data = [('a', 1, 2, 3), ('b', np.arange(10))] ...:
...: joblib.dump(orig_data, filename)
...:
...: load_data = joblib.load(filename)
...:
...: print(f'Orig: {orig_data}')
...: print(f'Load: {load_data}')
...:
Orig: [('a', 1, 2, 3), ('b', array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))] Load: [('a', 1, 2, 3), ('b', array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))] code: bash
$ file /tmp/test.joblib.z
/tmp/test.joblib.z: zlib compressed data
デフォルトでは、joblib.dump()は、速度とディスクスペースのトレードオフが最も良いとされるzlib圧縮方式を使用します。他にサポートされている圧縮方法は、'gzip'、'bz2'、'lzma'、'xz'です。
また、joblib.dump() 関数の compress 引数には、使用する圧縮方法の名前に対応する文字列を入力します。これを使用すると、デフォルトの圧縮レベルがコンプレッサーで使用されます。
code: python
In 2: # %load 05_supported_compression.py ...: import numpy as np
...: import joblib
...:
...: filename = '/tmp/test.joblib'
...:
...: orig_data = [('a', 1, 2, 3), ('b', np.arange(10))] ...:
...: # 圧縮レベル 3 の gzip
...: joblib.dump(orig_data, filename + '.gz', compress=('gzip', 3))
...: joblib.load(filename + '.gz')
...:
...: # 圧縮レベル 3 の bzip2
...: joblib.dump(orig_data, filename + '.bz2', compress=('bz2', 3))
...: joblib.load(filename + '.bz2')
...:
...: # 圧縮レベルはデフォルトの xz : Python 3.3以降
...: joblib.dump(orig_data, filename + '.xz', compress='xz')
...: joblib.load(filename + '.xz')
...:
...: # lz4 パッケージがインストールされていれば使える
...: try:
...: joblib.dump(orig_data, filename + '.lz4')
...: joblib.load(filename + '.lz4')
...: except ValueError as e:
...: print(e)
...: print('You should install lz4: pip install lz4')
...:
BZIP2、LZMAのほうが圧縮率が高い(より小さいサイズに圧縮できる)けれど、圧縮にかかる時間が長くなります。
追加コンプレッサーの登録
Joblibでは、利用可能なデフォルトのコンプレッサーのリストを拡張するために、joblib.register_compressor() を提供しています。joblib.load() や joblib.Memory のような Joblib 内部の実装や機能に合わせるために、登録されたコンプレッサーは Python file object インターフェースを実装しなければなりません。
Memoryクラス
Memoryクラスは、結果をストアに配置し、デフォルトではディスクを使用し、同じ引数に対して関数を2回再実行しないことにより、関数の遅延評価のコンテキストを定義します。
これは、出力をファイルに明示的に保存することで機能し、ハッシュ化できず、numpy配列などの潜在的に大きな入力および出力データ型で機能するように設計されています。
まず、指定したキャッシュディレクトリを使用するMemoryコンテキストをインスタンス化します。
code: joblib_config.py
from joblib import Memory
cachedir = '/tmp/joblib_sample'
memory = Memory(cachedir, verbose=0)
このとき、指定したディレクトリが存在しない場合は、自動的に作成されます。
また、cachedirとして/dev/shm/joblib_sampleなどのように、Linuxの共有メモリパーティション以下を指示した場合、キャッシュファイルはメモリ上に作成されることに注意してください。
システムが再起動されると/dev/shm以下は削除されます。
共有メモリパーティションはディスクよりも高速にアクセスすることができますが、メモリを消費していることを忘れないようにしてください。
OSXの場合、デフォルトでは/dev/shmのような共有メモリパーティションはありませんが、
次の手順でRAMディスクを作成することができます。
code: macos_ramdisk.sh
# メモリ上にディスク領域を作成する
SIZE_IN_MB=100
DISK_DEV=$( hdiutil attach -nomount ram://$((2 * 1024 * $SIZE_IN_MB)))
# ディスクをフォーマットしてマウントする
diskutil eraseVolume HFS+ RAMDisk $DISK_DEV
code: bash
$ bash macos_ramdisk.sh
この例の場合、RAMディスクは /Voulumes/RAMDisk としてマウントされます。
キャッシュディレクトリの準備が終わった後、@memory.cacheで関数をデコレートすると出力がキャッシュされるようになります。
code: python
...: from joblib_config import memory
...:
...: @memory.cache
...: def func(x):
...: print(f'Running func(x)')
...: return x
...:
...: # func(1)
...: # func(1)
...: # func(2)
...:
Running func(x)
Running func(x)
この関数を同じ引数で2回呼び出しても、2回目は実行されません。出力は、キャッシュディレクトリ内のpickleファイルから再ロードされるだけです。
ただし、別のパラメーターを使用して関数を呼び出すと、関数が実行され出力が再計算されます。
キャッシュディレクトリ
この時のキャッシュディレクトリは次のようになっています。
code: bash
$ tree -L 5 /tmp/joblib_sample/joblib
/tmp/joblib_sample/joblib
└── __main__--home-iisaka-Class
└── Parallel-03_04_Joblib-<ipython-input-96435af30dc6>
└── func
├── d3ffa92536e9b2aeb96c6d0e11ccd857
│ ├── metadata.json
│ └── output.pkl
├── fb65b1dace3932d1e66549411e3310b6
│ ├── metadata.json
│ └── output.pkl
└── func_code.py
5 directories, 5 files
$ cat /tmp/joblib_sample/joblib/__main__--home-iisaka-Class/Parallel-03_04_Joblib-\<ipython-input-96435af30dc6\>/func/d3ffa92536e9b2aeb96c6d0e11ccd857/metadata.json
{"duration": 0.0004916191101074219, "input_args": {"x": "1"}}(class_parallel)
lru_cacheとの比較
Python 3.2 から導入された functools.lru_cache を使っても関数をメモ化することができます。
lru_cacheはデコレートした関数を、与えた直近の呼び出し最大回数まで保存します。時間のかかる関数や I/O処理の多い関数を定期的に同じ引数で呼び出すときに、時間を節約できます。
結果のキャッシュには辞書が使われるので、関数の位置引数およびキーワード引数はハッシュ可能でなくてはなりません。Joblibとは違いメモリ上にだけキャッシュするため永続性はありません。
memorizeとの比較
memorizeデコレータ は、関数呼び出しのすべての入力と出力をメモリにキャッシュします。したがって、非常に小さなオーバーヘッドで、同じ関数を2回実行することを回避できます。ただし、各呼び出しで入力オブジェクトをキャッシュ内のオブジェクトと比較します。その結果、大きなオブジェクトの場合、大きなオーバーヘッドが発生します。さらに、このアプローチは、numpy配列や、重要でない変動の影響を受ける他のオブジェクトでは機能しません。最後に、大きなオブジェクトでmemorizeを使用すると、すべてのメモリが消費されてしまう可能性があります。joblib.Memoryでは、速度とメモリ使用量に最適化されたjoblib.dump()を使用して、オブジェクトがディスクに永続化されます。 つまり、memorize や lru_cache は小さな入力オブジェクトと出力オブジェクトを持つ関数に適切ですが、joblib.Memoryは複雑な入力オブジェクトと出力オブジェクトを持つ関数に適用でき、積極的にディスクへ永続化します。
numpyで使用する
Memoryコンテキストの背後にある元々の動機は、numpy配列にメモ化のようなパターンを持たせることでした。メモリは、入力引数の高速暗号化ハッシュを使用して、入力引数が計算されているかどうかを確認します。
2つの関数を定義します。最初の関数は引数として数値を持ち、配列を出力し、2番目の関数で使用されます。 どちらの関数もMemory.cacheでデコレートされています。
code: python
In 2: # %load 11_with_numpy.py ...: import numpy as np
...: from joblib_config import memory
...:
...: @memory.cache
...: def g(x):
...: print(f'パラメータ:{x}を使用した計算')
...: return np.hamming(x)
...:
...: @memory.cache
...: def h(x):
...: print(f'パラメータ:{x}を使用した計算')
...: return np.vander(x)
...:
...: # a = g(3)
...: # a
...: # g(3)
...: # b1 = h(a)
...: # b2 = h(a)
...: # b2
...: # np.allclose(b1, b2)
...:
パラメータ:3を使用した計算
In 9: np.allclose(b1, b2) 同じg()の呼び出しによって作成された配列を使用して関数h()が呼び出された場合、h()は再実行されません。
メモリマッピングの使用
メモリマッピングは、大きなnumpy配列をリロードするときにキャッシュの検索を高速化します。
code: python
In 2: # %load 12_using_memmap.py ...: import numpy as np
...: from joblib import Memory
...:
...: cachedir = '/tmp/cache_dir'
...: memory = Memory(cachedir, mmap_mode='r')
...: square = memory.cache(np.square)
...:
...: a = np.vander(np.arange(3)).astype(np.float64)
...:
...: a = square(a)
...: b = square(a)
...:
...: print(repr(b))
...:
...: # del a, b # for Windows
...:
________________________________________________________________________________
___________________________________________________________square - 0.0s, 0.0min
________________________________________________________________________________
___________________________________________________________square - 0.0s, 0.0min
注意
上記の例で使用されているデバッグモードに注意してください。
これは、何が再実行されているのか、どこで時間が費やされているのかを
追跡するのに役立ちます。
同じ入力引数を使用してsquare関数が呼び出された場合、その戻り値はメモリマッピングを使用してディスクからロードされます。
Windowsでのファイルロックを回避するには、memmapファイルを閉じる必要があります。 numpy.memmapオブジェクトを閉じるには、ディスクへの変更をフラッシュするdelを使用します。
注意
上記の例のように、使用されたメモリマッピングモード(mmap_mode)が 'r'の場合、
配列は読み取り専用になり、その場で変更することはできません。
一方、'r+'または'w+'を使用すると、配列の変更が有効になりますが、
これらの変更がディスクに伝播されるため、キャッシュが破損します。
メモリ内の配列を変更する場合は、'c'モード(コピーオンライト)を使用するようにします。
遅延参照:キャッシュされた値への参照情報を利用
場合によっては、結果自体を取得する代わりに、キャッシュされた結果への参照情報を取得すると便利な場合があります。 この典型的な例は、多数の大きなnumpy配列を複数のワーカーにディスパッチする必要がある場合です。データ自体をネットワーク経由で送信する代わりに、joblibキャッシュへの参照情報を送信し、ワーカーにネットワークファイルシステムからデータを読み取らせます。 システムレベルのキャッシュも利用する可能性があります。
キャッシュへの参照の取得は、ラップされた関数のcall_and_shelve()メソッドを使用して実行できます。
code: python
In 2: # %load 13_call_and_shelve.py ...: import numpy as np
...: from joblib_config import memory
...:
...: @memory.cache
...: def g(x):
...: print(f'パラメータ:{x}を使用した計算')
...: return np.hamming(x)
...:
...: @memory.cache
...: def h(x):
...: print(f'パラメータ:{x}を使用した計算')
...: return np.vander(x)
...:
...: a = g(4)
...: print(f'a: {a}')
...:
...: b = g.call_and_shelve(4)
...: print(f'repr(b): {repr(b)}')
...: print(f'b: {b.get()}')
...:
...: b.clear()
...: try:
...: print(f'b: {b.get()}')
...: except KeyError as e:
...: print(e)
...:
パラメータ:4を使用した計算
repr(b): MemorizedResult(location="/tmp/joblib_cache/joblib", func="__main__--home-iisaka-Class/Parallel-03_04_Joblib-<ipython-input-480b0cabd13b>/g", args_id="a68ee4c43ce0704fec19bc7fc7993e48")
'Non-existing item (may have been cleared).\nFile /tmp/joblib_cache/joblib/__main__--home-iisaka-Class/Parallel-03_04_Joblib-<ipython-input-480b0cabd13b>/g/a68ee4c43ce0704fec19bc7fc7993e48/output.pkl does not exist'
計算されると、gの出力はディスクに保存され、メモリから削除されます。 次に、関連する値の読み取りをget()メソッドで実行できます。
この特定の値のキャッシュは、clear()メソッドを使用してクリアできます。 その呼び出しにより、保存された値がディスクから消去されます。 その後get()を呼び出すと、KeyError例外が発生します。
MemorizedResultインスタンスには、キャッシュされた値を読み取るために必要なものがすべて含まれています。 送信または保存のためにPickle化することができ、内部表現をprint()で出力して、別のPythonインタープリターにコピー&ペーストすることもできます。
キャッシュが無効になっている場合の遅延参照
キャッシュが無効になっている場合(例:Memory(None))、call_and_shelve()メソッドはNotMemorizedResultインスタンスを返します。このインスタンスは、参照情報の代わりに、完全な関数出力を格納します。 コピー&ペーストの項目を除いて、上記は遅延参照の機能はすべて有効です。
名前の衝突
セッション全体で、関数キャッシュは関数名で識別されます。 したがって、同じ名前を異なる関数に割り当てると、それらのキャッシュは相互にオーバーライドされ、名前の衝突(name collisions) が発生してしまい、不要な再実行が発生します。
code: python
In 2: # %load 14_name_collision.py ...: import numpy as np
...: from joblib_config import memory
...:
...: @memory.cache
...: def func(x):
...: print(f'func({x})を実行')
...:
...: func2 = func
...:
...: @memory.cache
...: def func(x):
...: print(f'違うfunc({x})を実行')
...:
...: print('Calling func(1):')
...: func(1)
...:
...: print('Calling func2(1):')
...: func2(1)
...:
...: print('Calling func(1):')
...: func(1)
...:
...: print('Calling func2(1):')
...: func2(1)
...:
...: # func(1)
...: # %run 05_name_collision.py
...:
Calling func(1):
違うfunc(1)を実行
Calling func2(1):
func(1)を実行
Calling func(1):
Calling func2(1):
同じセッションが使用されている限り衝突は発生しないため、不必要な再計算はありません。
ただし、インタプリタを終了してから再起動すると、joblibは危険なことをしていることを警告します。
キャッシュが正しく識別されず、関数が再実行されます。
code: python
In 4: %run 14_name_collision.py Calling func(1):
/home/iisaka/Class.Parallel/03_04_Joblib/05_name_collision.py:18: JobLibCollisionWarning: Possible name collisions between functions 'func' (/home/iisaka/Class.Parallel/03_04_Joblib/05_name_collision.py:7) and 'func' (/home/iisaka/Class.Parallel/03_04_Joblib/05_name_collision.py:13)
func(1)
違うfunc(1)を実行
Calling func2(1):
/home/iisaka/Class.Parallel/03_04_Joblib/05_name_collision.py:21: JobLibCollisionWarning: Possible name collisions between functions 'func' (/home/iisaka/Class.Parallel/03_04_Joblib/05_name_collision.py:13) and 'func' (/home/iisaka/Class.Parallel/03_04_Joblib/05_name_collision.py:7)
func2(1)
func(1)を実行
Calling func(1):
Calling func2(1):
ラムダ関数
ラムダ関数の出力は区別できないことに注意してください。
code: python
In 2: # %load 15_lambda_function.py ...: from joblib_config import memory
...:
...: def my_print(x):
...: print(x)
...:
...: f = memory.cache(lambda : my_print(1))
...: g = memory.cache(lambda : my_print(2))
...:
...: print('1st Call f()')
...: f()
...: print('2nd Call f()')
...: f()
...: print('1st Call g()')
...: g()
...: print('2nd Call g()')
...: g()
...: print('3rd Call f()')
...: f()
...:
1st Call f()
1
2nd Call f()
1st Call g()
<ipython-input-2-d1c6e51a0e27>:15: JobLibCollisionWarning: Cannot detect name collisions for function '<lambda> (<ipython-input-2-d1c6e51a0e27>:1)'
g()
2
2nd Call g()
3rd Call f()
<ipython-input-2-d1c6e51a0e27>:19: JobLibCollisionWarning: Cannot detect name collisions for function '<lambda> (<ipython-input-2-d1c6e51a0e27>:1)'
f()
1
__call__()メソッドを持つ呼び出し可能なオブジェクトなどの、一部の複雑なオブジェクトではメモリを使用できません。
ただし、numpy.ufuncsでは機能します。
code: python
In 2: # %load 16_numpy_ufuncs.py ...: import numpy as np
...: from joblib_config import memory
...:
...: sin = memory.cache(np.sin)
...: print(sin(0))
...:
0.0
code: bash
$ tree -L 2 /tmp/joblib_cache/joblib/sin/
/tmp/joblib_cache/joblib/sin/
├── b09ed3b4f43f553add9ca907ee381d4a
│ ├── metadata.json
│ └── output.pkl
└── func_code.py
1 directory, 3 files
メソッドのキャッシュ
Memoryは純粋な関数(引数のみを使って計算する関数)へ適用するように設計されているため、メソッドに使用することはお勧めしません。 クラス内でキャッシュを使用する場合、推奨されるパターンは、純粋関数をキャッシュし、クラス内でキャッシュされた関数を使用することです。つまり、次のようになります。
code: python
In 2: # %load 17_caching_methods.py ...: from joblib_config import memory
...: import time
...:
...: @memory.cache
...: def compute_func(x, y, z):
...: print('Sleeping 5sec')
...: time.sleep(5)
...: return x * y * z
...:
...:
...: class Foo(object):
...: def __init__(self, x, y):
...: self.x = x
...: self.y = y
...:
...: def compute(self):
...: return compute_func(self.x, self.y, 40)
...:
...: foo = Foo(2,3)
...:
...: print('1st call foo.compute()')
...: a = foo.compute()
...:
...: print('2nd call foo.compute()')
...: a = foo.compute()
...:
1st call foo.compute()
Sleeping 5sec
2nd call foo.compute()
Memoryをメソッドに使用することは推奨されておらず、ソフトウェアのバージョンが進むにつれてこの警告を忘れがちなため、メンテナンス性が非常に脆弱になる恐れがあります。
クラスがインスタンス化されると、最初の引数(self)がバインドされ、Memoryオブジェクトにアクセスできなくなるため、クラス定義でメソッドをデコレートすることはできません。 そのため次のコードは機能しません。
code: python
class Foo(object):
@memory.cache # 機能しない
def method(self, args):
pass
これを行う正しい方法は、インスタンス化時にデコレートすることです。
code: python
In 2: # %load 18_caching_method_construct.py ...: from joblib_config import memory
...: import time
...:
...: def compute_func(x, y, z):
...: print('Sleeping 5sec')
...: time.sleep(5)
...: return x * y * z
...:
...: class Foo(object):
...: def __init__(self, x, y):
...: self.x = x
...: self.y = y
...: self.z = None
...: self.compute = memory.cache(self.compute)
...:
...: def compute(self):
...: return compute_func(self.x, self.y, 40)
...:
...: foo = Foo(2,3)
...:
...: print('1st call foo.compute()')
...: a = foo.compute()
...:
...: print('2nd call foo.compute()')
...: a = foo.compute()
...:
...: print('change foo.z')
...: foo.z = 3
...:
...: print('3rd call foo.compute()')
...: a = foo.compute()
...:
1st call foo.compute()
Sleeping 5sec
2nd call foo.compute()
change foo.z
3rd call foo.compute()
Sleeping 5sec
キャッシュされたメソッドは、引数の1つとしてselfを持ちます。 つまり、自己が変化した場合、結果が再計算されます。
この例では、self.zが変更された場合、self.compute()を呼び出すと、self.compute()が本体でself.zを使用していなくても、結果が再計算されます。
self.compute()の結果がselfに依存しないことがわかっている場合は、emory.cache()にignore引数で与えることができます。
code: python
In 2: # %load 19_caching_method_self.py ...: from joblib_config import memory
...: import time
...:
...: def compute_func(x, y, z):
...: print('Sleeping 5sec')
...: time.sleep(5)
...: return x * y * z
...:
...: class Foo(object):
...: def __init__(self):
...: self.z = None
...: self.compute = memory.cache(self.compute, ignore='self') ...:
...: def compute(self, x, y):
...: return compute_func(x, y, 40)
...:
...: foo = Foo()
...:
...: print('1st call foo.compute()')
...: a = foo.compute(2, 3)
...:
...: print('2nd call foo.compute()')
...: a = foo.compute(2, 3)
...:
...: print('change foo.z')
...: foo.z = 3
...:
...: print('3rd call foo.compute()')
...: a = foo.compute(2, 3)
...:
...: # a = foo.compute(1, 2)
...:
1st call foo.compute()
Sleeping 5sec
2nd call foo.compute()
change foo.z
3rd call foo.compute()
In 3: a = foo.compute(1,2) Sleeping 5sec
この例の場合、.compute()メソッドは Foo.zプロパティに依存していません。self以外の引数が変わったときに再計算されます。
注意
joblibキャッシュエントリは、実行環境が変わると無効になる場合があります。 joblib.hash によって返される値は、joblibバージョン間で一定であることが保証されていません。 これは、joblib.Memoryキャッシュのすべてのエントリがjoblibのアップグレード時に無効になる可能性があることを意味します。 無効化は、numpyなどサードパーティのライブラリをアップグレードするときにも発生する可能性があります。このような場合、アップグレードされたライブラリで定義された構造(または構造への参照を含む)であるパラメータを持つキャッシュされた関数呼び出しのみが、アップグレード後に無効になる可能性があります。
いくつかの引数を無視する
デバッグフラグなど、特定の引数が変更されたときに関数を再計算しないと便利な場合があります。 メモリはignoreリストを提供します。
code: python
In 2: # %load 20_ignore_args.py ...: from joblib_config import memory
...:
...: def my_func(x, debug=True):
...: print('Called with x = %s' % x)
...:
...: print('call my_func(0)')
...: my_func(0)
...:
...: print('call my_func(0)')
...: my_func(0)
...:
...: print('call my_func(0, debug=False)')
...: my_func(0, debug=False)
...:
...: print('call my_func(0, debug=True)')
...: my_func(0, debug=True)
...:
call my_func(0)
Called with x = 0
call my_func(0)
call my_func(0, debug=False)
call my_func(0, debug=True)
並列ループ
一般的な使用法
Joblibは、multiprocessing を使用して並列forループを作成するための単純なヘルパークラスを提供します。 中心的なアイデアは、ジェネレーター式として実行されるコードを記述し、それを並列コンピューティングに変換することです。
code: python
In 2: # %load 30_loop_as_generator.py ...: from math import sqrt
...:
...:
このコードは joblib を使って次のようにコードすると、2つのCPUに分散することができます。
code: python
In 2: # %load 31_loop_joblib.py ...: from math import sqrt
...: from joblib import Parallel, delayed
...:
...: a = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
...:
Parallel() の使用方法に違和感があるかもしれませんが、次のように考えると理解しやすいでしょう。
まず、何かを処理する関数 function() があり、これは引数として data を受け取り、結果を返すものです。
code: python
def function(data):
# ... 何かの処理
return value
これをjoblib の Parallel()で処理させるためには、次のようにコードします。
code: python
この行全体を見るとわかりにくいはずなので、次のように部分的に考えます。
code: python
Parallel(n_jobs=-1)(...)
Parallelクラスのインスタンスを作成しています。並列で処理する数を n_jobs 引数で与えます。-1で実行したプラットフォームが持つコア数を使うようになります。内部的には multiprocessing.cpu_count()が呼び出されてこの数を求めています。Parallel()には他にも多数の引数があります。
このインスタンスが呼び出し可能であり、(...)の部分はその引数になっているわけです。
これに続く、次のコードはよく見るとリスト内包表記になっていることがわかります。
code: python
data から要素1つ取り出して datum として扱い、function()にわたして処理した結果をリストにしているだけです。この例での function() は引数を1つ受け取るものですが、複数受け取るような関数の場合は次のようになるだけです。
code: python
また、delayed()の機能について理解するために、次のようにdalayed()を除外してみます。
code: python
dataで与えられる各要素に対して functioon()で処理した結果がリストになるわけです。
このリストが Parallel()に渡されることになりますが、このときすでに処理がされてしまっているわけですからParallel()で必要のないな処理を再度行うことになってしまいます。
つまり、delayed()はどの関数をどのような引数で呼び出したいかを、実際に呼び出すことなくPythonに伝えること、つまり名前どおりの遅延処理を行ってくれています。
遅延させたい呼び出しが次のものだとします。
code: python
function(1, a=2)
これを delayed() を使って次のようにコードします。
code: python
delayed(function)(1, a=2)
これは次のタプルを生成します。
code: python
delayed()は単にタプルを生成するだけなので、実際の呼び出しはしません。
このタプルのリストがParalle()に渡されることで並列にうまく処理されるわけです。
前述の 31_loop_joblib.py をもっと判読性を考慮してコードを書きなおすと次のようになります。
code: python
In 2: # %load 31_loop_joblib_pythonic.py ...: from math import sqrt
...: from joblib import Parallel, delayed
...:
...: p = Parallel(n_jobs=2)
...: a = p(tasks)
...:
...: # tasks
...: # a
...:
[(<function math.sqrt(x, /)>, (0,), {}),
(<function math.sqrt(x, /)>, (1,), {}),
(<function math.sqrt(x, /)>, (4,), {}),
(<function math.sqrt(x, /)>, (9,), {}),
(<function math.sqrt(x, /)>, (16,), {}),
(<function math.sqrt(x, /)>, (25,), {}),
(<function math.sqrt(x, /)>, (36,), {}),
(<function math.sqrt(x, /)>, (49,), {}),
(<function math.sqrt(x, /)>, (64,), {}),
(<function math.sqrt(x, /)>, (81,), {})]
こちらの方が分かりやすいですよね。では、なぜ joblib のドキュメントでは一見すると分かり難いように1行で書いているのでしょうか? 実は、これにも意味があります。詳しくは後述しますが joblib ではデフォルトではプロセスベースの並列処理を行います。データを変数におくと、その分のメモリが消費されてしまいます。プロセスベースでの並列化では Python インタプリタを含むメモリ内容がプロセスごとにコピーされるため、データが大量になればメモリコピーのオーバーヘッドも大きくなり無視できなくなってゆくからです。
複数の戻り値
関数に複数の戻り値がある場合の出力の再形成します。
code: python
n 2: # %load 32_loop_with_return_values.py ...: from math import modf
...: from joblib import Parallel, delayed
...:
...:
...: # modf(): 数値の整数部と小数部を同時に取得
...: r = Parallel(n_jobs=1)(delayed(modf)(d) for d in data)
...: res, i = zip(*r)
...:
...: # r
...: # res
...: # i
...:
[(0.0, 0.0),
(0.5, 0.0),
(0.0, 1.0),
(0.5, 1.0),
(0.0, 2.0),
(0.5, 2.0),
(0.0, 3.0),
(0.5, 3.0),
(0.0, 4.0),
(0.5, 4.0)]
Out5: (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5) Out6: (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0) 途中経過を知る
Prallel()に verbose 引数で0~10の値を与えることができます。0ではなにも表示せず、値が大きいほど、メッセージが多くなりります
code: python
In 2: # %load 33_verbose.py ...: from time import sleep
...: from joblib import Parallel, delayed
...:
...: print('verbose=1')
...: r = Parallel(n_jobs=2, verbose=1)(delayed(sleep)(.2) for _ in range(10))
...:
...:
...: print('verbose=10')
...: r = Parallel(n_jobs=2, verbose=10)(delayed(sleep)(.2) for _ in range(10)
...: )
...:
verbose=1
verbose=10
トレースバックの例では、子プロセスでトレースバックが発生した場合でも、エラーの行がどのように示されるか、および例外をトリガーした関数に渡されたパラメーターの値に注意してください。
code: python
In 2: # %load 34_traceback.py ...: from heapq import nlargest
...: from joblib import Parallel, delayed
...:
...: a = Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in data)
...:
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/iisaka/.conda/envs/class_parallel/lib/python3.9/site-packages/joblib/externals/loky/process_executor.py", line 436, in _process_worker
r = call_item()
File "/home/iisaka/.conda/envs/class_parallel/lib/python3.9/site-packages/joblib/externals/loky/process_executor.py", line 288, in __call__
return self.fn(*self.args, **self.kwargs)
File "/home/iisaka/.conda/envs/class_parallel/lib/python3.9/site-packages/joblib/_parallel_backends.py", line 595, in __call__
return self.func(*args, **kwargs)
File "/home/iisaka/.conda/envs/class_parallel/lib/python3.9/site-packages/joblib/parallel.py", line 262, in __call__
return [func(*args, **kwargs)
File "/home/iisaka/.conda/envs/class_parallel/lib/python3.9/site-packages/joblib/parallel.py", line 262, in <listcomp>
return [func(*args, **kwargs)
File "/home/iisaka/.conda/envs/class_parallel/lib/python3.9/heapq.py", line 545, in nlargest
it = iter(iterable)
TypeError: 'int' object is not iterable
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<ipython-input-2-e28872872511> in <module>
4
----> 6 a = Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in data)
(以下略)
もし、friendly_traceback モジュールがインストールされているのであれば、次のようにトレースバックを整形表示してくれます。
code: bash
$ python -m friendly_traceback 34_traceback.py
The above exception was the direct cause of the following exception:
File "34_traceback.py", line 5, in <module>
a = Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in data)
File "LOCAL:/joblib/parallel.py", line 1056, in __call__
self.retrieve()
File "LOCAL:/joblib/parallel.py", line 935, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "LOCAL:/joblib/_parallel_backends.py", line 542, in wrap_future_result
return future.result(timeout=timeout)
TypeError: 'int' object is not iterable
A TypeError is usually caused by trying
to combine two incompatible types of objects,
by calling a function with the wrong type of object,
or by trying to do an operation not allowed on a given type of object.
An iterable is an object capable of returning its members one at a time.
Python containers (list, tuple, dict, etc.) are iterables.
An iterable is required here.
Execution stopped on line 5 of file 34_traceback.py.
1: from heapq import nlargest
2: from joblib import Parallel, delayed
-->5: a = Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Parallel: <class joblib.parallel.Parallel>
delayed: <function delayed>
nlargest: <function nlargest>
Exception raised on line 542 of file LOCAL:/joblib/_parallel_backends.py.
537: @staticmethod
538: def wrap_future_result(future, timeout=None):
539: """Wrapper for Future.result to implement the same behaviour as
540: AsyncResults.get from multiprocessing."""
541: try:
-->542: return future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
543: except CfTimeoutError as e:
future: <Future>
timeout: None
future.result: <bound method Future.result> of <Future>
データが動的に生成されるプロデューサー/コンシューマーのプログラム構成では、pre_dispatch引数を使用すると、並列ループが開始される前にプロデューサーが最初に呼び出されてから、次にその場で新しいデータを生成するために呼び出されるようになります。
code: python
In 2: # %load 35_pre_dispatch.py ...: from math import sqrt
...: from joblib import Parallel, delayed
...:
...: def producer():
...: for i in range(6):
...: print('Produced %s' % i)
...: yield i
...:
...: out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
...: delayed(sqrt)(i) for i in producer())
...:
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Produced 5
Parallelオブジェクトは、ワーカーを使用して、多くの異なる引数への関数の適用を並行して計算します。 multiprocessingモジュールまたはconcurrent.futures モジュールのAPIの使用に加えて、それらの主な機能は次のとおりです。
特に引数のリストの作成を回避するため、より読みやすいコード。
より簡単なデバッグ:
クライアント側でエラーが発生した場合でも、有益なトレースバック
n_jobs=1を使用することで、コードを変更せずにデバッグ用の並列計算ができる
Pickle化時のエラーを早期発見できる
オプションのプログレスメーター。
キーボードからCtrl-C押下によるマルチプロセスジョブの中断
ワーカープロセスとの間の通信のための柔軟なPickle化制御。
大規模なnumpyベースのデータ構造のワーカープロセスで共有メモリを効率的に使用する機能。
スレッドベースの並列処理とプロセスベースの並列処理
デフォルトでは、joblib.Parallelは lokyモジュールを使用して、Pythonのワーカープロセスを起動し、別々のCPU上でタスクを同時実行します。これは一般的なPythonプログラムにとっては妥当なデフォルトですが、ワーカープロセスとの通信のために入出力データをキューにシリアライズする必要があるため、かなりのオーバーヘッドが発生します。
呼び出している関数が、その計算のほとんどで Python Global Interpreter Lock (GIL) を解放するコンパイル済みの拡張機能に基づいていることがわかっている場合、Python プロセスの代わりにスレッドを同時実行ワーカーとして使用する方が効率的です。例えば、Cython 関数の with nogil ブロックの中に CPU を集中させる部分を書いている場合がそうです。
コードが効率的にスレッドを使用できることを示唆するには、joblib.Parallelコンストラクタのパラメータにprefer="threads "を渡します。この場合、joblib はデフォルトの loky モジュールの代わりに threading モジュールを使用するようになります。
code: python
In 2: # %load 36_loop_joblib_threading.py ...: from math import sqrt
...: from joblib import Parallel, delayed
...:
...: a = Parallel(n_jobs=2, prefer="threads")(
...: delayed(sqrt)(i ** 2) for i in range(10))
...:
...:
また、コンテキストマネージャーを使って、特定のバックエンドの実装を手動で選択することも可能です。
code: python
In 2: # %load 37_parallel_backend.py ...: from math import sqrt
...: from joblib import Parallel, delayed, parallel_backend
...:
...: with parallel_backend('threading', n_jobs=2):
...: a = Parallel()(delayed(sqrt)(i ** 2) for i in range(10))
...:
parallel_backend()は、joblib.Parallelを内部で使用しているけれど、バックエンドの選択のためのAPIを公開していないようなライブラリを呼び出す場合に特に有効です
組み込みのjoblibバックエンド以外にも、Joblib Apache Spark Backendを使って、Sparkクラスタ上でjoblibタスクを分散させることもできます。
シリアライズとプロセス
複数のpythonプロセスで関数定義を共有するためには、シリアライズプロトコルに依存する必要があります。pythonの標準プロトコルはpickleですが、標準ライブラリ のデフォルト実装にはいくつかの制限があります。例えば、対話的に定義された関数や __main__ モジュールで定義された関数をシリアライズすることはできません。
この制限を避けるために、loky バックエンドはpythonオブジェクトのシリアライズにcloudpickleを利用しています。cloudpickleはpickleプロトコルの代替実装で、より多くのオブジェクト、特に対話的に定義された関数のシリアライズを可能にします。そのため、ほとんどの用途では、lokyのバックエンドはシームレスに動作するはずです。
cloudpickleの主な欠点は、標準ライブラリのpickleモジュールよりも遅くなる可能性があることです。特に、大きなpythonの辞書やリストでは、シリアライズ時間が最大で100倍も遅くなることがあります。この問題を解決するために、joblibのシリアライズ処理を変更する方法が2つあります。
UNIXシステムを使用している場合は、古典的な multiprocessing バックエンドに戻すことができます。このバックエンドでは、対話的に定義された関数を、高速な pickle を使ってワーカープロセスと共有することができます。このソリューションの主な問題点は、標準のPOSIXを破りプロセスの開始にforkを使用すると、numpyやopenblasなどのサードパーティのライブラリとの相互作用がおかしくなることです。
lokyバックエンドを別のシリアライズライブラリで使用したい場合は、LOKY_PICKLER=mod_pickle 環境変数を設定することで、mod_pickle を loky のシリアライズライブラリとして使用することができます。引数として渡されるmod_pickleモジュールは、import mod_pickleとしてインポート可能で、オブジェクトへのシリアライズに使用されるPicklerオブジェクトを含んでいる必要があります。LOKY_PICKLER=pickle とすることで、標準ライブラリの pickleモジュールを使用することができます。LOKY_PICKLER=pickle の主な欠点は、対話的に定義された関数がシリアライズできなくなることです。この問題に対処するには、joblib.wrap_non_picklable_objects() ラッパーを併用するとよいでしょう。のラッパーは、特定のオブジェクトに対してcloudpickleの使用をローカルに有効にするデコレーターとして使用できます。このようにして、全てのpythonオブジェクトを高速にピックルし、インタラクティブな関数に対してはローカルに低速なピックルを有効にすることができます。その例が loky_wrapper にあります。 共有メモリの効果
joblibのデフォルトのバックエンドは、各関数呼び出しを独立したPythonプロセスで実行し、そのためメインプログラムで定義された共通のPythonオブジェクトを変更することができません。
しかし、並列関数が本当にスレッドの共有メモリセマンティクスに依存する必要がある場合は、例えば require='sharedmem' のように明示的に指定する必要があります。
code: python
In 2: # %load 38_shared_memory.py ...: from math import sqrt
...: from joblib import Parallel, delayed
...:
...: shared_set = set()
...:
...: def collect(x):
...: shared_set.add(x)
...:
...: a = Parallel(n_jobs=2, require='sharedmem')(
...: delayed(collect)(i) for i in range(5))
...:
...: b = sorted(shared_set)
...:
共有メモリのセマンティクスに依存することは、Pythonの共有オブジェクトへの同時アクセスがロック競合に悩まされるため、パフォーマンスの観点からはおそらく最適ではないことに留意してください。
ワーカーのプールの再利用
アルゴリズムの中には,中間結果の処理を挟んで並列関数を複数回連続して呼び出す必要があるものがあります.joblib.Parallel をループ内で何度も呼び出すことは最適ではありません。なぜなら、ワーカー(スレッドやプロセス)のプールを何度も生成・破棄することになり、大きなオーバーヘッドが発生する可能性があるからです。
この場合、joblib.Parallel クラスのコンテキストマネージャー を使用して、joblib.Parallel オブジェクトへの複数回の呼び出しに同じワーカーのプールを再利用する方が効率的です。
code: python
In 2: # %load 39_worker_pool.py ...: from math import sqrt
...: from joblib import Parallel, delayed
...:
...: with Parallel(n_jobs=2) as parallel:
...: accumulator = 0.
...: n_iter = 0
...: while accumulator < 1000:
...: results = parallel(delayed(sqrt)(accumulator + i ** 2)
...: for i in range(5))
...: accumulator += sum(results)
...: n_iter += 1
...:
...: a = (accumulator, n_iter)
...:
Out3: (1136.5969161564717, 14) この資料作成時での joblib 1.1.0 では,プロセスベースの並列処理のためにデフォルトで使用されている loky バックエンドは,コンテキストマネージャーを使用しない呼び出しであっても,自動的にワーカーのプールを維持し再利用しようとします。
共有メモリ上の数値データの処理(memmapping
デフォルトでは,プールのワーカーは,n_jobs !=1 のときに Python 標準ライブラリの multiprocessing モジュールを使ってフォークされた本物の Python プロセスです.Parallel 呼び出しの入力として渡された引数はシリアライズされ,各ワーカープロセスのメモリに再配置されます.
これは,ワーカーによってn_jobs に与えた数だけ再割り当てされるため,大きな引数の場合には問題となります.
この問題はnumpyベースのデータ構造を持つ科学技術計算でしばしば発生するため、joblib.Parallelは大きな配列のための特別な処理を提供し、自動的にファイルシステム上にダンプし、numpy.ndarrayのnumpy.memmapサブクラスを使用してそのファイル上のメモリマップとして開くようにワーカーに参照を渡します。これにより、すべてのワーカープロセス間でデータのセグメントを共有することが可能になります。
注意
バックエンドが、loky および multiprocessing プであるとおきは、GILを解放できるコードであれば,prefer='threads' を指定してスレッドベースのバックエンドを使用すると,プロセスベースの並列処理に伴う通信のオーバーヘッドを回避できるため,さらに効率的です。
numpy、scipy、pandas、scikit-learn などのサイエンス領域のPythonライブラリは、重要なコードパスでGILを解放することがよくあります。そのため、常にスレッドベースの並列処理の速度を測定し、スケーラビリティがGILによって制限されない場合に使用することをお勧めします。
配列からメモリマップへの自動変換
配列からメモリマップへの自動変換は、配列のサイズに関する設定可能な閾値がトリガーとなります。
code: python
In 2: # %load 40_auto_memmap.py ...: import numpy as np
...: from joblib import Parallel, delayed
...:
...: def is_memmap(obj):
...: return isinstance(obj, np.memmap)
...:
...: a = Parallel(n_jobs=2, max_nbytes=1e6)(
...: delayed(is_memmap)(np.ones(int(i)))
...:
デフォルトでは、データは/dev/shm 共有メモリ・パーティションが存在し、書き込み可能であれば、そこにダンプされます(通常、Linuxの場合)。それ以外の場合は、オペレーティングシステムの一時フォルダが使用されます。Parallelのコンストラクタに temp_folder 引数を渡すことで、一時データファイルの場所を変更することができます。
max_nbytes=None を渡すと、配列からメモリマップへの自動変換を無効にすることができます。
メモリマップ化された入力データの手動管理
メモリ使用量をさらに細かく調整するために、親プロセスから直接配列をメンマップとしてダンプし、ワーカープロセスを分岐させる前にメモリを解放することも可能です。例えば、親プロセスのメモリに大きな配列を確保するとします。
code: python
large_array = np.ones(int(1e6)
これをダンプ、ロードするための MemMap クラスを用意します。
code: joblib_memmap.py
mport os
import shutil
import tempfile
from joblib import load, dump
class MemMap(object):
def __init__(self, filename='joblib.mmap'):
self.temp_folder = tempfile.mkdtemp()
self.filename = os.path.join(self.temp_folder, filename)
if os.path.exists(self.filename):
os.unlink(self.filename)
def __del__(self):
try:
shutil.rmtree(self.temp_folder)
except OSError:
pass
def dump(self, array):
_ = dump(array, self.filename)
def load(self, mmap_mode='r+'):
return load(self.filename, mmap_mode)
code: python
In 2: # %load 41_large_memmap.py ...: import numpy as np
...: from joblib import Parallel, delayed
...: from joblib_memmap import MemMap
...:
...: def is_memmap(obj):
...: return isinstance(obj, np.memmap)
...:
...: large_array = np.ones(int(1e6))
...:
...: mmap = MemMap()
...: mmap.dump(large_array)
...: large_memmap = mmap.load()
...:
...: assert is_memmap(large_memmap) == True
...: assert large_memmap.__class__.__name__ == 'memmap'
...: assert large_memmap.nbytes == large_array.nbytes
...: assert large_memmap.shape == large_array.shape
...:
...: # 2つのNumPy配列ndarrayを要素ごとに比較する
...: d = np.allclose(large_array, large_memmap)
...: assert d == True
...:
...: # オリジナルの配列を削除
...: del large_array
...: import gc
...: _ = gc.collect()
...:
...: # 部分配列を抜き出す
...: small_memmap = large_memmap2:5 ...: assert is_memmap(small_memmap) == True
...: assert small_memmap.__class__.__name__ == 'memmap'
...:
...: # NumPy配列ndarryの形式にすることもOK
...: small_array = np.asarray(small_memmap)
...: assert small_array.__class__.__name__ == 'ndarray'
...:
...: a = Parallel(n_jobs=2, max_nbytes=None)(
...: delayed(is_memmap)(a)
...:
...:
large_memmapは numpy.memma のインスタンスオブジェクトを指していて、内容は同じです。オリジナルの large_arrayを削除しても、問題なくアクセスもできます。
これらの3つのデータストラクチャーはすべて同じメモリバッファを指しており、この同じバッファはParallelコールのワーカープロセスによっても直接再利用されます。
この例では、Parallelの自動ダンプ機能を無効にするために max_nbytes=None を与えていることに注目してください。small_array は、親プロセスで既に共有メモリにバックアップされていたため、ワーカープロセスではまだ共有メモリにあります。Parallelのマルチプロセッシングキューのシリアル化は、この状況を検知し、メモリコピーの数を制限するためにその場で最適化することができます。
並列計算の結果を共有メモリに書き込む
メインプログラムでデータが w+ または r+ モードで開かれている場合、ワーカーは r+ モードでアクセスします。これにより、ワーカーは元のデータに直接計算結果を書き込むことができ、親プロセスに計算結果を送り返すシリアル化の必要性を軽減することができます。
注意
numpy.memmapインスタンスのインプレイス演算子や代入を使用するなどして、同時作業者が重複する共有メモリデータセグメントに書き込みを行うと、numpyはアトミック操作を提供していないため、データが破損する可能性があります。前述の例では、各タスクが共有結果配列の排他的なセグメントを更新しているため、この問題のリスクはありません。
いくつかのC/C++コンパイラはadd-and-fetchやcompare-and-swapのようなロックフリーのアトミックプリミティブを提供しており、例えばCFFIを介してPythonに公開することができます。しかし、numpyを意識したアトミックな構造を提供することは、joblibプロジェクトの範囲外です。
最後の注意:計算が終わったら、一時的なフォルダをきれいにすることを忘れないでください。OutOfCoreクラスでは、デストラクタにこの処理をさせています。
CPU資源の過剰使用の回避
計算の並列性は、複数のCPUを使って同時に処理を行うことに依存しています。マシンに搭載されているCPUの数よりも多くのプロセスを使用すると、各プロセスが使用できる計算能力が低下するため、各プロセスのパフォーマンスが低下します。また、多くのプロセスが動作している場合、OSのスケジューラーがプロセスを切り替えるのに時間がかかるため、計算のパフォーマンスがさらに低下します。一般的には、マシンのCPU数を大幅に上回るプロセスやスレッドを使用することは避けた方がよいでしょう。
numpyで使用されているBLASライブラリなど、一部のサードパーティライブラリでは、計算を実行するためにスレッドプールを内部で管理しています。デフォルトでは、利用可能なCPUの数と同じ数のスレッドを使用します。これらのライブラリを joblib.Parallel と一緒に使用すると、各ワーカーが独自のスレッドプールを生成するため、リソースが大量にオーバーサブスクリプションされ、シーケンシャルな計算に比べて計算速度が低下します。この問題に対処するために、joblib はサポートしているサードパーティライブラリに、loky' バックエンドで管理されるワーカーで限られた数のスレッドを使用するように指示します。デフォルトでは、各ワーカープロセスは、すべてのワーカーで使用されるスレッドの合計数がホストの CPU 数を超えないように、最大 cpu_count() // n_jobs を許可するように環境変数が設定されます。
この動作は、適切な環境変数に必要なスレッド数を設定することでオーバーライドできます。このオーバーライドは以下のライブラリでサポートされています。
環境変数OMP_NUM_THREADSを設定したOpenMP、
環境変数OPENBLAS_NUM_THREADSを設定したOpenBLAS、
環境変数 MKL_NUM_THREADSを設定したMKL、
環境変数VECLIB_MAXIMUM_THREADSで設定したOpenBLASで高速化(Linux)
環境変数 NUMEXPR_NUM_THREADSを持つNumexpr。
joblib 0.14以降では、parallel_backend 関数の inner_max_num_threads 引数を使って、以下のようにプログラムでデフォルトのスレッド数をオーバーライドすることもできます。
code: python
from joblib import Parallel, delayed, parallel_backend
with parallel_backend("loky", inner_max_num_threads=2):
results = Parallel(n_jobs=4)(delayed(func)(x, y) for x, y in data)
この例では、4つのPythonワーカープロセスがそれぞれ2つのスレッドを使用できるため、このプログラムは最大で8つのCPUを同時に使用できることになります。
カスタムバックエンドAPI(実験的)
バージョン0.10の新機能。
警告
カスタムバックエンドAPIは実験的なものであり、非推奨のサイクルを経ることなく変更される可能性があります。
ユーザーは、デフォルトで提供される、loky、threading、multiprocessingバックエンドに加えて、並列処理バックエンドの独自の実装を提供できます。 バックエンドは、名前とバックエンドファクトリを渡すことにより、joblib.register_parallel_backend()関数に登録されます。
バックエンドファクトリは、ParallelBackendBaseのインスタンスを返す任意の呼び出し可能オブジェクトにすることができます。 独自のカスタムバックエンドを実装する場合は、デフォルトのバックエンドソースコードを参照してください。
リモートクラスターコンピューティングサービスのネットワークアドレスや接続資格情報など、いくつかの必須コンストラクターパラメーターを持つバックエンドクラスを登録できることに注意してください。
code: python
class MyCustomBackend(ParallelBackendBase):
def __init__(self, endpoint, api_key):
self.endpoint = endpoint
self.api_key = api_key
...
# クラスメソッドの1つでself.endpointとself.api_keyを使用して何かを実行します
register_parallel_backend('custom', MyCustomBackend)
次に、接続パラメーターをjoblib.parallel_backend()コンテキストマネージャーに渡すことができます。
code: python
Parallel()(delayed(some_function)(i) for i in range(10))
コンテキストマネージャーの使用は、独自のAPIでバックエンド引数を公開せずに、内部でjoblib.Parallelを使用するサードパーティライブラリを使用する場合に役立ちます。
新しい並列バックエンドを登録する外部パッケージを、joblibで識別できるように、明示的にインポートする必要があるという問題があります。
code: python
>> import joblib
>> with joblib.parallel_backend('custom'):
... ... # this fails
KeyError: 'custom'
# Import library to register external backend
>> import my_custom_backend_library
>> with joblib.parallel_backend('custom'):
... ... # this works
これは、ユーザーを混乱させる可能性があります。 これを解決するために、外部パッケージは、バックエンドを登録する小さな関数を作成し、この関数をjoblib.parallel.EXTERNAL_PACKAGESディクショナリに含めることで、バックエンドをjoblibコードベース内に直接安全に登録できます。
code: python
def _register_custom():
try:
import my_custom_library
except ImportError:
raise ImportError("an informative error message")
EXTERNAL_BACKENDS'custom' = _register_custom これはコミュニティレビューの対象ですが、外部パッケージインポートの副作用に依存する場合のユーザーの混乱を減らすことができます。
古いマルチプロセッシングバックエンド
バージョン0.12より前では、joblibはlokyではなくmultiprocessingバックエンドをデフォルトのバックエンドとして使用していました。
このバックエンドは、複数のプロセスでPythonインタープリターをフォークしてリストの各項目を実行するmultiprocessing.Poolのインスタンスを作成します。 遅延関数は、関数呼び出し構文を使用してタプル`(function, args, kwargs) を作成できるようにするための簡単なトリックです。
警告
Windowsでは、multiprocessing.Poolを使用するには、joblib.Parallelを使用するときにサブプロセスが再帰的に生成されないように、コードのメインループを保護する必要があります。 つまり、「マルチプロセッシング」バックエンドを使用する場合は、次のようなコードを作成する必要があります。
code: python
import ....
def function1(...):
...
def function2(...):
...
...
if __name__ == '__main__':
# インポートと関数で定義された何かをする
...
__main__ブロックの外でコードを実行することはできません。インポートと定義のみを実行してください。
joblib 0.12以降でデフォルトで使用されるlokyバックエンドは、これを強制しなくなりました。
マルチプロセッシングとサードパーティライブラリの悪い相互作用
multiprocessingバックエンドを使用すると、ライブラリが最初にメインプロセスで使用され、その後ワーカープロセス(joblib.Parallel呼び出し内)で再度呼び出された場合に、独自のネイティブスレッドプールを管理するサードパーティライブラリを使用するとクラッシュが発生する可能性があります。
Joblibバージョン0.12以降では、プロセスベースの並列処理の新しいデフォルトバックエンドとしてlokyが使用されているため、この問題は発生しなくなりました。
Python 3.4より前のバージョンでは、joblibのmultiprocessingバックエンドは、Windows以外のシステムではforkを使用したワーカープロセスの作成しかできませんでした。このため、一部のサードパーティライブラリがクラッシュまたはフリーズする可能性があります。このようなライブラリには、Apple vecLib / Accelerate(OSXでNumPyによって使用される)、OpenBLASの古いバージョン(0.2.10より以前)、またはXGBoost、spaCy、OpenCVなどのサードパーティライブラリによって内部的に使用されるGCCのOpenMPランタイム実装が含まれます。
この問題を回避する最善の方法は、multiprocessingバックエンドの代わりにlokyバックエンドを使用することです。 joblib 0.12より以前では、Python3.4以降でforkserverのstart()メソッドを使用するようにjoblib.Parallelを構成することもできます。 start()メソッドは、環境変数JOBLIB_START_METHODをデフォルトのfork,start()メソッドではなくforkserverに設定して構成する必要があります。ただし、ユーザーは、forkserver()メソッドを使用すると、joblib.Parallelがシェルセッションでインタラクティブに定義された関数を呼び出すことができないことに注意する必要があります。
Windowsでは、forkシステムコールはまったく存在しないため、この問題は発生しません(ただし、multiprocessing にはより多くのオーバーヘッドがあります)。
サンプルプログラム
これらを一度査読しておくことをお勧めします。
参考
loky - Reusable Process Pool Executor cloudpickle - pickleモジュールでサポートされていないPythonコンストラクトのシリアル化