lokyを使ってみよう
lokyについて
loky は、concurrent.futuresのProcessPoolExecutorクラスと良く似たAPI提供する、堅牢なクロスプラットフォームの実装です。 次のような特徴があります。
一貫性のある堅牢なプロセス生成:
POSIXシステムでは、すべてのプロセスはシステムコール fork()とexec()を使用して生成されます。これにより、サードパーティライブラリとのより安全な相互作用が保証されます。multiprocessing.Poolクラスはデフォルトでは、exec()を呼ばずに fork()を使用するため、サードパーティのランタイムがクラッシュすることがあります(OpenMP、macOS Accelerateなど)。
再利用可能なExecutor:
毎回完全なエグゼキュータを再生成しないようにする戦略を採用しています。ひとつだけ生成したエグゼキュータインスタンスは、必要に応じて動的にサイズ変更し、連続する呼び出し全体で再利用して、プロセスの生成と停止のオーバーヘッドを制限することができます。システムリソースを解放するための構成可能なアイドリングタイムアウト後に、ワーカープロセスを自動的にシャットダウンできます。
透過的なcloudpickle統合:
インタラクティブに定義された関数とラムダ式を並行して呼び出すため。プロセス間通信を処理するためのカスタムPicklerの実装を登録することも可能です。
"if __name__ == '__main __':" が不要:
スクリプト内の__main__モジュールで定義された関数を呼び出すためにcloudpickleを使用しているため、Windowsで並列関数を呼び出すコードを保護する必要はありません。 デッドロックのない実装:
標準のmultiprocessingおよびconcurrent.futuresモジュールのPool / Executorには、ワーカープロセスのクラッシュを引き起こす場合があるという懸念事項があります。lokyは、これらの考えられるデッドロックを修正し、意味のあるエラーを送り返します。 Python 3.7以降に付属するconcurrent.futures.ProcessPoolExecutor の実装は、lokyのエグゼキューターと同じくらい堅牢ですが、lokyは古いバージョンのPythonでも機能するということに留意してください。
インストール
loky のインストールは次のように行います。
code: bash
$ pip install loky
psutil がインストールされていると、lokyはメモリリークの早期検出ができるようになります。 使用方法
lokyの基本的な使用法は、get_reusable_executorでエグゼキュータインスタンスを生成することです。これは、コンテキストに応じて再利用または再生成されるカスタムProcessPoolExecutorオブジェクトを内部的に管理します。
code: 01_loky_demo.py
import os
from time import sleep
from loky import get_reusable_executor
def say_hello(k):
pid = os.getpid()
print(f'Hello from {pid} with arg {k}')
sleep(.01)
return pid
# 4つのワーカープロセスを持つエグゼキュータを作成
# ワーカーは2秒間アイドリングした後に自動的にシャットダウンする
executor = get_reusable_executor(max_workers=4, timeout=2)
res = executor.submit(say_hello, 1)
print("Got results:", res.result())
results = executor.map(say_hello, range(50))
n_workers = len(set(results))
print("Number of used processes:", n_workers)
assert n_workers == 4
code: bash
$ python 01_loky_demo.py
Got results: 37412
Number of used processes: 4
Hello from 37415 with arg 5
Hello from 37415 with arg 10
(中略)
Hello from 37476 with arg 48
Hello from 37477 with arg 49
Number of used processes: 4
エグゼキュータを再利用
次のコードは、ReusableProcessPoolを再利用するためのloky APIの使用方法の例です。
get_reusable_executorはエグゼキュータを生成します。 このエグゼキュータが壊れない限り、すべての計算に再利用されます。
code: 02_loky_resusable_executor.py
import os
import argparse
from loky import get_reusable_executor
def func_async(i):
import os
pid = os.getpid()
return (2*i, pid)
def test_1():
executor = get_reusable_executor(max_workers=1)
return executor.submit(func_async, 1)
def test_2():
executor = get_reusable_executor(max_workers=1)
return executor.submit(func_async, 2)
def test_3():
executor = get_reusable_executor(max_workers=1)
return executor.submit(func_async, 3)
f1 = test_1()
f2 = test_2()
f3 = test_3()
main_pid = os.getpid()
pids = [v1 for v in results] for i, (val, pid) in enumerate(results):
assert val == 2 * (i + 1)
assert pid != main_pid
print("All the jobs were run in a process different from main process")
assert len(set(pids)) == 1
print("All the computation where run in a single"
f" ProcessPoolExecutor with worker pid={pids0}") エグゼキュータの設定
セットアップが複雑な場合でも、エグゼキュータを再利用することができます。reuse=True引数を使用すると、必要に応じてエグゼキュータのサイズが変更されますが、引数は同じままです。
code: 03_loky_executor_setup.py
from time import sleep
import multiprocessing as mp
from loky import get_reusable_executor
INITIALIZER_STATUS = "uninitialized"
def initializer(x):
global INITIALIZER_STATUS
print("{} init".format(mp.current_process().name)) INITIALIZER_STATUS = x
def return_initializer_status(delay=0):
sleep(delay)
global INITIALIZER_STATUS
return INITIALIZER_STATUS
executor = get_reusable_executor(
max_workers=2,
initializer=initializer,
initargs=('initialized',),
context="loky",
timeout=1000)
assert INITIALIZER_STATUS == "uninitialized"
executor.submit(return_initializer_status).result()
assert executor.submit(return_initializer_status).result() == 'initialized'
# reuse=True 場合はエグゼキュータは同じイニシャライザを使用する
executor = get_reusable_executor(max_workers=4, reuse=True)
for x in executor.map(return_initializer_status, .5 * 4): assert x == 'initialized'
# reuse='auto' 場合は新しいエグゼキュータが作成されるため、
# イニシャライザは使用されなくなる
executor = get_reusable_executor(max_workers=4)
for x in executor.map(return_initializer_status, .1 * 4): assert x == 'uninitialized'
Pickle化できないオブジェクトのシリアル化
この例では、lokyのシリアル化処理の例を示しています。
code: 04_non_pickable_object.py
mport sys
import time
from loky import set_loky_pickler
from loky import get_reusable_executor
from loky import wrap_non_picklable_objects
def func_async(i, *args):
return 2 * i
# cloudpickle でシリアル化(lokyデフォルト)して実行
executor = get_reusable_executor(max_workers=1)
print(executor.submit(func_async, 21).result())
# 辞書やリストなどの大きなPythonオブジェクトでは
# cloudpickle は pickle よりシリアル化に時間がかかる場合がある
large_list = list(range(1000000))
t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(func_async, 21, large_list).result()
print("With cloudpickle serialization: {:.3f}s".format(time.time() - t_start))
# pickle でシリアル化するように指示
# Pickle化できないため目的の関数 func() を渡せないため、
# デモンストレーションの目的で id() に置き換えられています。
set_loky_pickler('pickle')
t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(id, large_list).result()
print("With pickle serialization: {:.3f}s".format(time.time() - t_start))
try:
executor = get_reusable_executor(max_workers=1)
executor.submit(func_async, 21, large_list).result()
except Exception as e:
print(e)
まず、標準ライブラリのpickleプロトコルではpickle化できない関数を定義しています。 これらは__main__モジュールで定義されているため、pickleでシリアル化することはできません。 ただし、cloudpickleを使用するとシリアル化できます。
code: python
def func_async(i, *args):
return 2 * i
デフォルトでは、lokyはcloudpickleを使用して、ワーカーに送信されるオブジェクトをシリアル化します。
code: python
executor = get_reusable_executor(max_workers=1)
print(executor.submit(func_async, 21).result())
この結果は、 42が出力されます。
ほとんどの場合では、cloudpickleを使用するだけで十分効率的です。 ただし、cloudpickleは、標準のpickleモジュールでのシリアル化と比較して、辞書やリストなどの大きなPythonオブジェクトのシリアル化に非常に時間がかかる場合があります。
これを軽減するために、メインプロセスとワーカー間のすべての通信をシリアル化するためにpickleを使用させることが可能です。 これは、スクリプトが起動される前に、環境変数LOKY_PICKLER=pickleを設定するか、loky APIで提供されるset_loky_pickler()関数を呼び出して設定することで、シリアル化処理に pickle モジュールを使用するようになります。
この出力は、次のようになります。
code: bash
$ python 04_non_pickable_object.py
42
With cloudpickle serialization: 0.106s
With pickle serialization: 0.082s
A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
cloudpickle でシリアル化に要した時間は、pickle の場合と比較して5倍になっています。
ただし、__main__で定義された関数とオブジェクトは、pickleを使用してシリアル化できなくなることに注意してください。
lokyは、ラッパー関数wrap_non_picklable_objects()を使って、pickle化できない関数をラップし、この特定の関数をcloudpickleを使用してシリアル化する必要があることをシリアル化処理に示します。 これにより、この関数のシリアル化処理のみが変更され、他のすべてのオブジェクトには引き続きpickleが使用されます。 この方法の欠点は、オブジェクトを変更してしまうことです。 これにより、関数で多くの問題が発生することはありませんが、オブジェクトインスタンスで副作用が発生する可能性があります。
code: p05_wrap_non_pickable_obj.py
import sys
import time
from loky import set_loky_pickler
from loky import get_reusable_executor
from loky import wrap_non_picklable_objects
@wrap_non_picklable_objects
def func_async(i, *args):
return 2 * i
large_list = list(range(1000000))
# pickle でシリアル化するように指示
set_loky_pickler('pickle')
t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(func_async, 21, large_list).result()
print("With default and wrapper: {:.3f}s".format(time.time() - t_start))
同じラッパーを、Pickle化できないクラスにも使用することができます。 オブジェクトに対するwrap_non_picklable_objects()の副作用は、__add__などのマジックメソッドを壊し、isinstance()関数とissubclass()関数を台無しにする可能性があることに注意してください。
APIリファレンス
lokyモジュールは、時間をかけて再利用できるワーカーのプールを管理します。これは、ProcessPoolExecutorおよび関数get_reusable_executor()の堅牢で動的な実装を提供し、内部でプール管理を非表示にします。
loky.get_reusable_executor(max_workers=None, context=None, timeout=10, kill_workers=False,
reuse='auto', job_reducers=None, result_reducers=None,
initializer=None, initargs=() ) ソースコード max_workers: ワーカープロセスで並行して実行できるタスクの最大数を与えます。
デフォルトでは、これはホスト上のCPUの数に設定されています。
context:
timeout: タイムアウト時間を秒単位で与えます。
アイドル状態のワーカーが自動的にシャットダウンして、システムリソースが解放されます。新しいワーカーは、新しいタスクの送信時にリスポーンされるため、max_workersは新しく送信されたタスクを受け入れることができます。timeoutを、新しいプロセスを生成してパッケージをインポートするのに必要な時間の約100倍(100ミリ秒程度)に設定すると、生成するワーカーのオーバーヘッドが無視できるようになります。
kill_workers: ワーカーを強制的に中断するかどうか
Trueを設定すると、以前に生成されたジョブを強制的に中断して、新しいコンストラクター引数値を持つ再利用可能なエグゼキューターの新しいインスタンスを取得できます。
job_reducers: エグゼキュータに送信されるタスクのPickle化をカスタマイズで使用されます。
result_reducers:エグゼキュータに送信される結果のPickle化をカスタマイズで使用されます。
initializer: 新しく生成されたプロセスを初期化するために実行されます。
initargs: initializerに与えた関数の引数
戻り値: 現在のReusableExectutorインスタンスを返します。
新しいインスタンスがまだ開始されていない場合、または前のインスタンスが壊れた状態のままになっている場合は、新しいインスタンスを開始します。
前のインスタンスに要求された数のワーカーがない場合、エグゼキュータは動的にサイズ変更され、戻る前にワーカーの数を調整します。
単一のインスタンスを再利用することで、新しいワーカープロセスを開始し、毎回一般的なPythonパッケージをインポートするオーバーヘッドを節約できます。
タスクと結果のシリアル化
複数のPythonプロセス間で関数定義を共有するには、シリアル化プロトコルに依存する必要があります。 Pythonの標準プロトコルはpickleですが、標準ライブラリでのデフォルトの実装にはいくつかの制限があります。たとえば、インタラクティブに、または__main__モジュールで定義されている関数をシリアル化することはできません。
lokyはcloudpickleがインストールされていれば、それを利用してこの制限を回避します。cloudpickleは、より多くのオブジェクトのシリアル化を可能にするpickleプロトコルの派生モジュールであり、pip install cloudpickleを 使用してインストールできます。このライブラリは標準ライブラリのpickleモジュールよりも遅いため、デフォルトでは、lokyは__main__モジュール内にあることが検出されたオブジェクトをシリアル化するためにのみ使用します。
lokyでのシリアル化を調整するには、次の3つの方法があります。
引数job_reducersおよびresult_reducersを使用して、シリアル化プロセスのカスタムレデューサーを登録することができます。
環境変数LOKY_PICKLERを使用可能で有効なシリアル化モジュールを設定します。
このモジュールは、有効なPicklerオブジェクトを提示する必要があります。環境変数LOKY_PICKER=cloudpickle を設定すると、__main__モジュール内にあることが検出されたオブジェクトをシリアル化するだけでなく、lokyがすべてをcloudpickleでシリアル化するようになります。
loky.wrap_non_picklable_objectsデコレータを使用して、Pickle化できないオブジェクトをラップすることができます。この場合、他のすべてのオブジェクトはデフォルトの動作と同様にpickleでシリアル化され、ラップされたオブジェクトはcloudpickleを使用してPickle化されます。
この例では、各方法の利点と欠点が強調されています。
loky.wrap_non_picklable_objects(obj, keep_wrapper=True) ソースコード ピクルスできないオブジェクトのラッパーで、cloudpickleを使用してシリアル化します。
このラッパーは、通常、pickleと比較して遅いcloudpickleで実行されるため、シリアル化プロセスが遅くなる傾向があることに注意してください。シリアル化の問題を解決する適切な方法は、メインスクリプトで関数とオブジェクトを定義することを避け、複雑なクラスに__reduce__()関数を実装することです。
loky のプロセス開始方法
lokyのAPIは、プロセスの開始方法を制御するデフォルトのstart_methodを設定するset_start_method()関数を提供します。使用可能な方法は、'loky'、 'loky_int_main'、 'spawn'です。 Unix系プラットフォームでは、'fork'、 'forkserver'も使用できます。 lokyはmultiprocessing.set_start_method()関数とは互換性がないことに注意してください。適切な動作を保証するには、提供されている関数を使用してデフォルトの開始メソッドを設定する必要があります。
メモリリークに対する保護
メモリリークが発生すると、長時間実行されるワーカープロセスのメモリサイズが無期限に増加する可能性があります。これにより、これらのリークが解決されない場合、OSによってプロセスが強制終了される可能性があります。これを防ぐために、lokyはリーク検出、メモリクリーンアップ、およびワーカーのシャットダウンを提供します。
psutil モジュールがインストールされている場合、各ワーカーはタスクの完了後に定期的にメモリ使用量をチェックします(注1)。使用法が異常であることが判明した場合(注2)、追加のgc.collect()イベントがトリガーされ、潜在的な循環参照を持つオブジェクトが削除されます。その後も、プロセスワーカーのメモリ使用量が高すぎる場合は、安全にシャットダウンされ、エグゼキュータによって新しいプロセスが自動的に生成されます。
psutilがインストールされていない場合、ワーカープロセスのメモリ使用量を監視する簡単な方法はありません。 gc.collect()イベントは、引き続き各ワーカー内で定期的に呼び出されます(注1)が、リークが発生していないという保証はありません。
(注1) 毎秒ごと。 この定数は、loky.process_executor._MEMORY_LEAK_CHECK_DELAYで定義されています。
(注2) 前回の参照と比較して100MBの増加。これは、最初のタスクを完了した後のワーカーの残りのメモリ使用量として定義されます。
まとめ
multiprocessing.Poolクラスはデフォルトでは、exec()を呼ばずに fork()を使用するため、サードパーティのランタイムがクラッシュする可能性があります。
loky を利用すると、これを回避することができます。
concurrent.futuresのProcessPoolExecutorクラスでは、Pickle化できないオブジェクトやクラスには使用することができませんが、loky を利用すると、この制限も回避することができます。
参考
cloudpickle - pickleモジュールでサポートされていないPythonコンストラクトのシリアル化 psutil: 実行中のプロセスとシステム使用率(CPU、メモリ、ディスク、ネットワーク、センサー)に関する情報を取得するためのクロスプラットフォームライブラリ。