Rayを使ってみよう
https://gyazo.com/387a3959139f28499eb9d73bdbcb6b5d
Rayについて
Ray は、分散アプリケーションを構築するためのシンプルで汎用的なAPIを提供します。 Rayには次の特徴があります。
分散アプリケーションを構築および実行するための単純なプリミティブを提供
コードをほとんど修正せずに、コードを並列処理できる
Ray.Core ではアプリケーション、ライブラリ、ツールを含むエコシステム
複雑なアプリケーションの作成も容易になる’
Ray Coreはアプリケーション構築のための単純なプリミティブを提供します。
https://gyazo.com/2bb24170ff073a30069167dd9cead390
(Ray1.0 White Paperから引用)
Ray は Python と Java をサポートしています。
ここでは、Python について説明することにします。
RayCore
Ray Coreには次の手順があります。
Rayの開始
リモート関数(タスク:task)の使用
結果のフェッチ(オブジェクト参照)
リモートクラス(アクター:actors)の使用
Rayを使用すると、コードはシングルノードから大規模なクラスターまで、簡単にスケーリングさせることができます。
ここでのサンプルを実行するためには、次のコマンドで Ray をインストールしておきます。
code: zsh
% pip install -U ray
Ray の起動
次のコードをアプリケーションに追加することで、シングルノードでRayを起動できます。
code: python
import ray
# Rayを起動
ray.init()
# 既存のクラスターに接続している場合は次のようにRayを起動
# ray.init(address=<cluster-address>)
# ...
マルチノードRayクラスターを開始することは後述しています。
リモート関数(タスク)
Rayを使用すると、任意の関数を非同期で実行することができます。 これらのRay非同期関数はリモート関数(Ray remote function)と呼ばれます。
code: python
# Python の通常の関数
def my_function():
return 1
# @ray.remote デコレータを追加して、リモート関数にする
@ray.remote
def my_function():
return 1
# このリモート関数を呼び出すには、 remote()メソッドを使用します。
# obj_refがすぐに返され、ワーカープロセスで実行されるタスクが作成さる
obj_ref = my_function.remote()
# 結果は ray.get()で取得できます。
assert ray.get(obj_ref) == 1
@ray.remote
def slow_function():
time.sleep(10)
return 1
# Rayリモート関数の呼び出しは並列で実行される
# すべての計算は、レイの内部イベントループによって駆動されるバックグラウンドで実行される
for _ in range(4):
# This doesn't block.
slow_function.remote()
オブジェクト参照をリモート関数に渡す
オブジェクト参照をリモート関数に渡すこともできます。 関数が実際に実行されると、引数は通常のオブジェクトとして取得されます。
code: python
@ray.remote
def function_with_an_argument(value):
return value + 1
obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1
# オブジェクトrefを引数として別のRayリモート関数に渡すことができる
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2
このときには、次の動作に注意してください。
2番目のタスクは最初のタスクの出力に依存するため、最初のタスクの実行が終了するまで実行されません。
2つのタスクが異なるマシンでスケジュールされている場合、最初のタスクの出力(obj_ref1/objRef1に対応する値)は、ネットワークを介して2番目のタスクがスケジュールされているマシンに送信されます。
必要なリソースの指定
1つのタスクにGPUが必要な場合など、多くの場合で、タスクのリソース要件を指定したい場合があります。 Rayは、マシンで使用可能なGPUとCPUを自動的に検出します。 ただし、特定のリソースを渡すことで、このデフォルトの動作をオーバーライドできます。
code: python
ray.init(num_cpus=8, num_gpus=4, resources={'Custom': 2})`
Rayでは、CPU、GPU、カスタムリソースなどの、タスクのリソース要件を指定することもできます。 タスクを実行するのに十分なリソースがある場合にのみ、タスクはノード上で実行されます。
code: python
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
return 1
注意
リソースを指定しない場合、デフォルトは1 CPUリソースだけで、他のリソースはありません。
CPUを指定する場合、Rayは分離を強制しません(つまり、タスクはその要求を受け入れることが期待されます)。
GPUを指定する場合、RayはCUDA_VISIBLE_DEVICESを設定することで、デバイス形式の分離を提供します。しかし、実際にGPUを使用するかどうかはタスクの責任です。
タスクのリソース要件は、Rayスケジューリングの同時実行に影響を与えます。 特に、特定のノードで同時に実行されているすべてのタスクのリソース要件の合計は、ノードの合計リソースを超えることはできません。
次のコードは、リソース要求の例です。
code: python
# Rayは部分的リソースの要求をサポート
@ray.remote(num_gpus=0.5)
def h():
return 1
# Rayはカスタムリソースの要求もサポート
@ray.remote(resources={'Custom': 1})
def f():
return 1
複数の戻り値
Pythonリモート関数は、複数のオブジェクト参照を返すことができます。
code: python
@ray.remote(num_returns=3)
def return_multiple():
return 1, 2, 3
a, b, c = return_multiple.remote()
タスクのキャンセル
リモート関数は、返されたオブジェクトを ray.cancel()に与えて呼び出すとキャンセルすることができます。 リモートアクター関数は、ray.kill()を使用してアクターを強制終了することで停止できます。
code: python
@ray.remote
def blocking_operation():
time.sleep(10e6)
obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)
from ray.exceptions import TaskCancelledError
try:
ray.get(obj_ref)
except TaskCancelledError:
print("Object reference was cancelled.")
Rayのオブジェクト
Rayでは、オブジェクトを作成して計算を実行します。 これらのオブジェクトをリモートオブジェクトと呼び、リモートオブジェクトを使用してそれらを参照します。 リモートオブジェクトは共有メモリオブジェクトストアに格納され、クラスター内のノードごとに1つのオブジェクトストアがあります。 クラスタ設定では、各オブジェクトがどのマシンにあるかが実際にはわからない場合があります。
object_refは、基本的に、リモートオブジェクトを参照するために使用できる一意のIDです。 先物に精通している場合、オブジェクト参照は概念的に類似しています。
オブジェクト参照は、複数の方法で作成できます。
それらは put()メソッドによって配置されます。
code: python
# Rayのオブジェクトストアにオブジェクトを配置
y = 1
object_ref = ray.put(y)
注意
リモートオブジェクトは不変(Immutable)です。
つまり、ray.put()で取得した値は変更することはできません。
これにより、コピーを同期することなく、リモートオブジェクトを複数のオブジェクトストアに
複製することができます。
結果の取得
get()メソッドを使用して、オブジェクト参照からリモートオブジェクトの結果をフェッチできます。 現在のノードのオブジェクトストアにオブジェクトが含まれていない場合、オブジェクトがダウンロードされます。
オブジェクトがnumpy配列またはnumpy配列のコレクションである場合、get()メソッドの呼び出しはゼロコピーであり、共有オブジェクトストアメモリにある配列を返します。 それ以外の場合は、オブジェクトデータをPythonオブジェクトに逆シリアル化します。
code: python
# 1つのオブジェクト参照の値を取得します。
obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1
# 複数のオブジェクト参照の値を並行して取得します。
# 長時間ブロックしているget()から早く戻るようにタイムアウトを設定することもできる
from ray.exceptions import GetTimeoutError
@ray.remote
def long_running_function():
time.sleep(8)
obj_ref = long_running_function.remote()
try:
ray.get(obj_ref, timeout=4)
except GetTimeoutError:
print("get timed out.")
いくつかのタスクを起動した後、どのタスクが実行を終了したかを知りたい場合があります。 これは、wait(ray.wait)で実行できます。 この機能は次のように機能します。
code: python
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
オブジェクトの退避
オブジェクトストアがいっぱいになると、オブジェクトは削除され、新しいオブジェクト用のスペースが確保されます。 これは、おおよそのLRUオーダー(Least Recently Used Order: 最近使用されていない順序)で発生します。 オブジェクトが削除されないようにするには、代わりにget()メソッドを呼び出してその値を格納します。 Numpy配列オブジェクトは、Pythonプロセスでマップされている間は削除できません。 また、メモリ制限を構成して、アクターによるオブジェクトストアの使用を制御することもできます。
注意
put()メソッドで作成されたオブジェクトはメモリに固定されますが、put()メソッドによって返されるオブジェクトrefへのPython参照は存在します。 これは、put()メソッドによって返される特定の参照にのみ適用され、一般的な参照またはその参照のコピーには適用されません。
リモートクラス(アクター)
アクター(Actor)は、Ray APIを関数(タスク)からクラスに拡張します。 アクターは本質的にステートフルワーカーです。
@ray.remoteデコレータは、Counterクラスのインスタンスがアクターになることを示します。 各アクターは、独自のPythonプロセスで実行されます。
code: python
@ray.remote
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
# このクラスからアクターを生成する
counter = Counter.remote()
必要なリソースの指定
アクターでリソース要件を指定することもできます
code: python
# アクターに必要なリソースを指定する
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor(object):
pass
アクターを呼び出す
remote()メソッドを呼び出すことにより、アクターと対話することができます。 次に、オブジェクトrefでget()メソッドを呼び出して、実際の値を取得できます。
code: python
# アクターの呼び出し
obj_ref = counter.increment.remote()
assert ray.get(obj_ref) == 1
異なるアクターで呼び出されたメソッドは並行して実行でき、同じアクターで呼び出されたメソッドは、呼び出された順序で順番に実行されます。 以下に示すように、同じアクターのメソッドは互いに状態を共有します。
code: python
# 10個のCounterアクターを作成する
# 各カウンターを1回インクリメントして、結果を取得する
# これらのタスクはすべて並行して行われます。
# Increment the first Counter five times. These tasks are executed serially
# and share state.
# 最初のカウンターを5回インクリメントする
# これらのタスクはシリアルに実行され、状態を共有します。
results = ray.get([counters0.increment.remote() for _ in range(5)]) Rayタスクを使用したPython 関数の並列化
まず、rayをインポートし、Rayサービスを初期化します。 次に、リモートで実行する関数を@ray.remoteでデコレートします。 関数は、直接呼び出すのではなく、.remote() を使用してその関数を呼び出します。 このリモート呼び出しにより、ray.get() でフェッチできるfutureまたはObjectRefが生成されます。
code: python
import ray
ray.init()
@ray.remote
def f(x):
return x * x
RayActorsを使用したPythonクラスの並列化
Rayは、Python のクラスインスタンスを並列化できるようにするアクターを提供します。 Rayアクターであるクラスをインスタンス化すると、Rayはクラスター内でそのクラスのリモートインスタンスを開始します。 このアクターは、リモートメソッド呼び出しを実行し、独自の内部状態を維持できます。
code: python
import ray
ray.init() # これを一度だけ呼び出す
@ray.remote
class Counter(object):
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
def read(self):
return self.n
サブミット
Rayプログラムは、シングルノードで実行することができ、大規模なクラスターにシームレスに拡張することもできます。 この例のRayスクリプトをクラウドで実行するには、次のコマンドのように構成ファイルを与えてサブミットします。
code: zsh
Rayクラスタ
Rayの強みの1つは、同じプログラムで複数のマシンを活用できることです。もちろん、Rayはシングルノードで実行できますが、Rayの本当の実力はクラスターを使用するときに発揮されます。
Ray クラスタには次の重要な概念があります。
Rayノード:
Rayクラスターは、ヘッドノードと一連のワーカーノードで構成されます。ヘッドノードを最初に起動する必要があり、ワーカーノードにはクラスターを形成するためのヘッドノードのアドレスが与えられます。 Rayクラスター自体も自動スケーリング(Auto Scaling)できます。つまり、クラウドプロバイダーと対話して、アプリケーションのワークロードに応じてインスタンスを要求または解放できます。
ポート:
RayプロセスはTCPポートを介して通信します。オンプレミスまたはクラウドのいずれかでRayクラスターを開始するときは、Rayが正しく機能するように、適切なポートを開くことが重要です。
Rayクラスタランチャー(Ray Cluster Launcher):
Ray Cluster Launcherは、マシンを自動的にプロビジョニングし、マルチノードRayクラスターを起動するシンプルなツールです。クラスターランチャーは、GCP、Amazon EC2、Azure、Kubernetesをサポートしています。
Rayクラスタの開始
Ray ClusterLauncher 利用するか手動でRayクラスターを開始します。
Kubernetes、YARN、SLURMなどの標準のクラスターマネージャーを使用してRayクラスターを作成することもできます。
クラスターが開始されたら、ray start を実行したのと同じノードでドライバープロセスを開始して、プログラムをRayクラスターに接続する必要があります。
code: python
# これは必須
import ray
ray.init(address='auto')
そして、スクリプトの残りの部分は、分散フレームワークとしてRayを活用できるはずです。
クラスターランチャーの使用
ray upコマンドは、Ray Cluster Launcherを使用してクラウド上でクラスターを開始し、指定された「ヘッドノード」とワーカーノードを作成します。 クラスターノードのいずれかでray.init(address= ...) を実行するPythonプロセスは、Rayクラスターに接続します。
重要
ラップトップはヘッドノードではないため、レイアップを使用している場合、
ラップトップでは ray.initを呼び出すことはできません。
AWSでクラスターランチャーを使用する例を次に示します。
code: zsh
% ray up ray/python/ray/autoscaler/aws/example-full.yaml
Rayクラスターのステータスを監視
code: zsh
% ray monitor cluster.yaml
ヘッドノードにSSHで接続する
code: zsh
% ray attach cluster.yaml
Rayクラスターを手動でセットアップ
Rayクラスターを実行する最も好ましい方法は、Ray ClusterLauncherを使用することです。
ただし、以下の手順、手動でRayクラスターを開始することもできます。
この例では、マシンのリストがあり、クラスター内のノードが相互に通信できることと、Rayが各マシンにインストールされていることを前提としています。
各マシンでRayを起動する
ヘッドノードで(ヘッドノードとしていくつかのノードを選択するだけです)、以下を実行します。 引数--portを省略すると、Rayはポート6379を選択し、ランダムなポートにフォールバックします。
code: zsh
% ray start --head --port=6379
次のステップは、別のノードからこのRayランタイムに接続するには、次のコマンドを実行します。
code: zsh
% ray start --address='<ip address>:6379' --redis-password='<password>'
接続に失敗した場合は、ファイアウォールの設定とネットワーク構成を確認してください。
このコマンドは、起動されたRedisサーバーのアドレス(ローカルノードのIPアドレスと指定したポート番号)を出力します。
次に、他の各ノードで、以下を実行します。<address> は、必ずヘッドノードのコマンドで出力された値に置き換えてください(123.45.67.89:6379 のようになります)。
計算ノードがネットワークアドレス変換を使用して独自のサブネットワーク上にある場合、そのサブネットワーク外の通常のマシンから接続するには、ヘッドノードによって出力されるコマンドは機能しないことに注意してください。 2番目のマシンからヘッドノードに到達するアドレスを見つける必要があります。 ヘッドノードが compute04.berkeley.edu のようなFQDNがDNSに登録されている場合は、IPアドレスの代わりにそれを使用することができます。
code: zsh
% ray start --address=<address> --redis-password='<password>'
Rayクラスタ を停止する場合は、ray stop を実行します。
マシンに10個のCPUと1個のGPUがあることを指定する場合は、オプション --num-cpus=10および--num-gpus=1を使用してこれを行うことができます。
次のメッセージが表示された場合は、指定されたIPアドレスで--port にアクセスできないことを意味します。
Unable to connect to Redis. If the Redis instance is on a different machine, check that your firewall is configured properly
これは、ヘッドノードが実際にRayを実行していないか、 間違ったIPアドレスを与えたような場合に発生します。
Rayランタイムが開始されている場合は、ノードが--port のIPアドレスに正常に接続されています。 これで、ray.init(address='auto') を使用してクラスターに接続できるようになります。
ray.init(address='auto') が 次のメッセージを表示する場合は、ノードはメインポート以外の他のポートへの接続に失敗しています。
redis_context.cc:303: Failed to connect to Redis, retrying.,
接続に失敗した場合、ノードから各ポートに到達できるかどうかを確認するには、nmapやncなどのツールを使用できます。
code: zsh
% nmap -sV --reason -p $PORT $HEAD_ADDRESS
Nmap scan report for compute04.berkeley.edu (123.456.78.910)
Host is up, received echo-reply ttl 60 (0.00087s latency).
rDNS record for 123.456.78.910: compute04.berkeley.edu
PORT STATE SERVICE REASON VERSION
6379/tcp open redis syn-ack ttl 60 Redis key-value store
% nc -vv -z $HEAD_ADDRESS $PORT
Connection to compute04.berkeley.edu 6379 port tcp/* succeeded! ノードがそのIPアドレスでそのポートにアクセスできない場合は、次のように表示されることがあります。
code: zsh
% nmap -sV --reason -p $PORT $HEAD_ADDRESS
Nmap scan report for compute04.berkeley.edu (123.456.78.910)
Host is up (0.0011s latency).
rDNS record for 123.456.78.910: compute04.berkeley.edu
PORT STATE SERVICE REASON VERSION
6379/tcp closed redis reset ttl 60
% nc -vv -z $HEAD_ADDRESS $PORT
nc: connect to compute04.berkeley.edu port 6379 (tcp) failed: Connection refused
RayクラスターでのRayプログラムの実行
分散Rayプログラムを実行するには、ノードの1つと同じマシンでプログラムを実行する必要があります。
プログラム内で、ray.init()を与えて呼び出しするときに、アドレスパラメータを与える必要があります。
次の例は、Rayは既存のクラスターに接続します。
code: python
ray.init(address="auto")
注意
よくある間違いは、ラップトップでスクリプトを実行しているときに、
アドレスをクラスターノードに設定することです。
スクリプトはRayノードの1つで開始/実行する必要があるため、これは機能しません。
正しい数のノードがクラスターに参加していることを確認するには、以下を実行できます。
code: python
import time
@ray.remote
def f():
time.sleep(0.01)
return ray.services.get_node_ip_address()
# クラスターに参加しているノードのIPアドレスのリストを取得する
並列イテレータ
ray.util.iter()は、単純なデータの取り込みと処理のための並列イテレーターAPIを提供します。 これは、Rayアクターとray.wait()ループの周りの構文を別な表記にしたものと考えることができます。
並列イテレータはダラダラとアイテムの無限のシーケンスに対して動作します。 イテレータ変換は、ユーザーがnext()を呼び出して、イテレータから次の出力項目をフェッチする場合にのみ実行されます。
並列イテレータ
並列イテレータ(Parallel Iterators)は、既存のアイテムのセット、数値の範囲、イテレーターのセット、またはワーカーアクターのセットからParallelIteratorオブジェクトを作成できます。 Rayは、イテレーターの各シャードのデータを生成するワーカーアクターを作成します。
code: python
>> it = ray.util.iter.from_items(1, 2, 3, 4, num_shards=2) # range(1_000_000)に32のワーカーを持つイテレータを作成
>> it = ray.util.iter.from_range(1_000_000, num_shards=32)
# 2つのrange(10)ジェネレータにイテレータを作成
ParallelIterator[from_iteratorsshards=2] # 既存のワーカーアクターからイテレーターを作成
# これらのアクターは、ParallelIteratorWorkerインターフェースを実装する必要がある
マッピング、フィルタリング、バッチ処理などの単純な変換をイテレータで連鎖させることができます。 これらは、ワーカーに対して並行して実行されます。
code: python
# イテレータの各要素に変換を適用
>> it = it.for_each(lambda x: x ** 2)
ParallelIterator....for_each() # アイテムをまとめて32個の要素のリストにする
>> it = it.batch(32)
ParallelIterator....for_each().batch(32) # 奇数値のアイテムを除外
>> it = it.filter(lambda x: x % 2 == 0)
ParallelIterator....for_each().batch(32).filter() ローカルイテレータ
ローカルイテレータ(Local Iterators) では、並列イテレータから要素を読み取るには、gather_sync()またはgather_async() を呼び出してLocalIteratorに変換する必要があります。 これらは、それぞれアクター上のray.get() ループとray.wait() ループに対応します。
code: python
# アイテムを同期的に収集します(シャード間で決定論的なラウンドロビンで)
>> it = ray.util.iter.from_range(1000000, 1)
>> it = it.gather_sync()
# ローカルイテレータは、他のPythonイテレータと同じように使用できる
>> it.take(5)
# 変換の連鎖(chaining of transformations)もサポートされる
# ParallelIteratorに適用される変換とは異なり、それらは現在のプロセスで実行される
>> it.filter(lambda x: x % 2 == 0).take(5)
# 非同期収集(Async Gather)はパフォーマンスを向上させるために使用できますが、
# 非決定性(ステップの判断で複数の結果を並列的に引き起こせる)となる
>> it = ray.util.iter.from_range(1000, 4).gather_async()
>> it.take(5)
イテレータをリモート関数に渡す
ParallelIteratorとLocalIteratorはどちらもシリアライズ可能です。 それらは任意のRayリモート関数に渡すことができます。 ただし、各シャードは一度に1つのプロセスでのみ読み取る必要があることに注意してください。
code: python
# # このParallelIteratorのシャードを表すローカルイテレータを取得する
>> it = ray.util.iter.from_range(10000, 3)
# イテレータシャードはリモート関数に渡すことができる
>> @ray.remote
... def do_sum(it):
... return sum(it)
...
セマンティック保証
並列イテレーターAPIは、次のセマンティクスを保証します。
フェッチの順序:
it.gather_sync().foreach(fn)または it.gather_async().foreach(fn)を使用する場合、fn(x_i) は次の前に要素 x_i で呼び出されます 要素 x_ {i +1} はソースアクターからフェッチされます。 これは、イテレータステップ間でソースアクターを更新する必要がある場合に役立ちます。 非同期ギャザーの場合、この順序はシャードごとにのみ適用されることに注意してください。
オペレーターの状態:
オペレーターの状態は、シャードごとに保持されます。 これは、ステートフル呼び出し可能オブジェクトをforeach()メソッドに渡すことができることを意味します。
code: python
class CumulativeSum:
def __init__(self):
self.total = 0
def __call__(self, x):
self.total += x
return (self.total, x)
it = ray.util.iter.from_range(5, 1)
for x in it.for_each(CumulativeSum()).gather_sync():
print(x)
## 出力
実行例:ストリーミングワード頻度カウント
並列イテレータは、ストリーミングgrepなどの単純なデータ処理のユースケースに使用できます。
code: python
import ray
import glob
import gzip
import numpy as np
ray.init()
file_list = glob.glob("/var/log/syslog*.gz")
it = (
ray.util.iter.from_items(file_list, num_shards=4)
.for_each(lambda f: gzip.open(f).readlines())
.flatten()
.for_each(lambda line: line.decode("utf-8"))
.for_each(lambda line: 1 if "cron" in line else 0)
.batch(1024)
.for_each(np.mean)
)
# 1024行のスライディングウィンドウで、"cron"を含むログ行の確率を表示します。
for freq in it.gather_async():
print(freq)
実行例:イテレータシャードをリモート関数に渡す
並列イテレータとローカルイテレータはどちらも完全にシリアル化できるため、それらをRayタスクとアクターに渡すことができます。 これは、分散トレーニングに役立ちます。
code: python
import ray
import numpy as np
ray.init()
@ray.remote
def train(data_shard):
for batch in data_shard:
print("train on", batch) # perform model update with batch
it = (
ray.util.iter.from_range(1000000, num_shards=4, repeat=True)
.batch(1024)
.for_each(np.array)
)
ray.get(work)
小技:
通常、ParallelIteratorの組み込み関数を使用するのが最も効率的です。
例えば、次のようなリスト内包表記を使用している場合
[foo(x) for x in iter.gather_async()]
代わりに、iter.for_each(foo) を使用することもできます。
分散マルチプロセッシングプール
Rayは、ローカルプロセスの代わりにRayActorsを使用するmultiprocessing.PoolAPIでの分散Pythonプログラムの実行をサポートしています。 これにより、マルチプロセッシングを使用する既存のアプリケーションを簡単にスケーリングできます。単一ノードからクラスターにプールします。
開始するには、最初にRayをインストールしてから、multiprocessing.Poolの代わりにray.util.multiprocessing.Poolを使用します。 これにより、初めてプールを作成してタスクをプール全体に分散するときに、ローカルRayクラスターが開始されます。 代わりにマルチノードRayクラスターで実行する手順については、以下の「クラスターでの実行」セクションを参照してください。
code: python
from ray.util.multiprocessing import Pool
def f(index):
return index
pool = Pool()
for result in pool.map(f, range(100)):
print(result)
完全なmultiprocessing.PoolAPIが現在サポートされています。
クラスターで実行
このセクションでは、Rayクラスターが実行されていることを前提としています。 Rayクラスターを開始するには、クラスターのセットアップ手順を参照してください。
プールを実行中のRayクラスターに接続するには、次の2つの方法のいずれかでヘッドノードのアドレスを指定できます。
RAY_ADDRESS環境変数を設定する。
ray_addressキーワード引数をPoolクラスのコンストラクターに渡す。
code: python
from ray.util.multiprocessing import Pool
# 新しいローカルRayクラスターを開始します。
pool = Pool()
# 現在のノードをヘッドノードとして、実行中のRayクラスターに接続します。
# または、環境変数RAY_ADDRESS="auto"を設定します。
pool = Pool(ray_address="auto")
# リモートノードをヘッドノードとして、実行中のRayクラスターに接続します。
# または、環境変数RAY_ADDRESS="<ip_address>:<port>"を設定します
pool = Pool(ray_address="<ip_address>:<port>")
プールを作成する前に、ray.init()にサポートされている構成オプションを使用して呼び出すことで、Rayを手動で開始することもできます。
Ray Core での機械学習
Ray Core には機械学習のためのライブラリも提供されています。
Ray Serve:スケーラブルでプログラム可能なサーバー機能
Tune:スケーラブルなハイパーパラメータを調整する
RLlib:スケーラブルな強化学習
RaySGD:分散トレーニングラッパー
Dask、MARS、Modin、Horovod、Hugging Face、Scikit-learnなど、Rayと連携するコミュニティプロジェクトも数多くあります。 Rayをサポートする分散ライブラリこちらで確認してください。 Ray Serve
スケーラブルでプログラム可能なサービングを提供します。
Ray Serveは、Ray上に構築された使いやすいスケーラブルなモデルサービングライブラリです。
Ray Serve には次の特徴があります。
フレームワークに依存しない:
Ray Serveのツールキットを使用して、PyTorch、Tensorflow、Kerasなどのフレームワークで構築されたディープラーニングモデルから、Scikit-Learnモデル、任意のPythonビジネスロジックまで、あらゆるものを提供します。
依存モジュールが不要:
純粋なPythonコードを使用してモデルを構成します。YAMLやJSONなどは不要です。
Ray ServeはRay上に構築されている
このため、データセンターやクラウドで、多数のサーバーへ簡単に拡張できます。
Ray Serveは、モデルを大規模にデプロイするための、2つの方法で使用できます。
Python関数とクラスをHTTPエンドポイントの背後に自動的に配置する
PythonのServeHandle APIを使用して、既存のPythonWebサーバー内からそれらを呼び出します。
Ray Serve の利用方法
Ray Serveは、Pythonバージョン3.6以降をサポートします。 Ray Serveをインストールするには、次のコマンドを実行します。
code: zsh
Ray Serve を使って関数 say_hello() をクラスタへ送出します。
code: python
import ray
from ray import serve
import requests
ray.init()
client = serve.start()
def say_hello(request):
return "hello " + request.query_params"name" + "!" # 関数からバックエンドを作成し、エンドポイントに接続する
client.create_backend("my_backend", say_hello)
client.create_endpoint("my_endpoint", backend="my_backend", route="/hello")
# HTTPとPythonの2つの異なる方法でエンドポイントをクエリする
# > hello serve!
print(ray.get(client.get_handle("my_endpoint").remote(name="serve")))
# > hello serve!
もしくはステートフルクラスにサービスを提供します。
code: python
import ray
from ray import serve
import requests
ray.init()
client = serve.start()
class Counter:
def __init__(self):
self.count = 0
def __call__(self, request):
self.count += 1
return {"count": self.count}
# クラスからバックエンドを作成し、エンドポイントに接続します。
client.create_backend("my_backend", Counter)
client.create_endpoint("my_endpoint", backend="my_backend", route="/counter")
# HTTPとPythonの2つの異なる方法でエンドポイントをクエリする
# > {"count": 1}
print(ray.get(client.get_handle("my_endpoint").remote()))
# > {"count": 2}
Ray Serve の利点
機械学習アプリケーションを提供するには、一般に次の2つの方法がありますが、どちらも重大な制限があります。
Flaskなどで開発した、従来型のウェブサーバー(独自アプリ)を使用する
クラウドでホストされるソリューションを使用する
最初のアプローチは簡単に始めることができますが、各コンポーネントをスケーリングするのは困難です。 2番目のアプローチでは、ベンダーロックイン(SageMaker)、フレームワーク固有のツール(TFServing)、および一般的な柔軟性の欠如が発生します。
Ray Serveは、本番デプロイメントに必要な複雑なルーティング、スケーリング、およびテストロジックを処理しながら、単純なWebサーバー(および独自のWebサーバーを使用する機能)を提供することで、これらの問題を解決します。
Ray Serveは、複数のレプリカを使用してバックエンドをスケールアップするだけでなく、次のことも可能にします。
ルーティングロジックを応答処理ロジックから切り離すことにより、ダウンタイムがゼロのバックエンド間でトラフィックを分割します。
パフォーマンスを向上させるためのバッチ処理—パフォーマンス目標の達成を支援するために組み込まれています。
リソース管理(CPU、GPU)-各GPUを複数のモデルで完全に飽和させるための部分的なリソース要件を指定します。
Ray Serve をどんなときに使用するか
Ray Serveは、Pythonベースの機械学習モデルをデプロイ、操作、監視するためのシンプルな(ただし柔軟な)ツールです。 Ray Serveは、本番環境でモデルを提供するためにスケールアウトする必要がある場合に優れています。これは、大規模なバッチ処理要件が原因であるか、さまざまなエンドポイントの背後で多数のモデルにサービスを提供する予定であり、A / Bテストを実行したり、さまざまなモデル間のトラフィックを制御したりする必要がある場合があります。
複数のマシンで実行することを計画している場合は、RayServeが役立ちます。
Ray Tune
Ray Tuneは、あらゆる規模のハイパーパラメータを調整するためのライブラリです。 Tuneを使用すると、10行未満のコードでマルチノード分散ハイパーパラメータスイープを起動できます。
ハイパーパラメタ は推論や予測の枠組みの中で決定されないパラメータのことで、機械学習アルゴリズムの挙動を設定するパラメータをさします。 Tuneは、PyTorch、TensorFlow、Kerasなどのディープラーニングフレームワークをサポートしています。
code: zsh
この例では、反復トレーニング関数を使用して小さなグリッド検索を実行します。
code: python
from ray import tune
def objective(step, alpha, beta):
return (0.1 + alpha * step / 100)**(-1) + beta * 0.1
def training_function(config):
# ハイパーパラメータ
for step in range(10):
# 反復トレーニング機能-任意のトレーニング手順にすることができる
intermediate_score = objective(step, alpha, beta)
# スコアをチューンにフィードバックする
tune.report(mean_loss=intermediate_score)
analysis = tune.run(
training_function,
config={
})
print("Best config: ", analysis.get_best_config(
metric="mean_loss", mode="min"))
# 試行結果を分析するためのデータフレームを取得する
df = analysis.results_df
TensorBoardがインストールされている場合は、すべての試行結果を自動的に視覚化します。
code: zsh
% tensorboard --logdir ~/ray_results
Ray RLlib
RLlibは、Rayの上に構築された強化学習用のオープンソースライブラリであり、さまざまなアプリケーションに高いスケーラビリティと統合されたAPIの両方を提供します。
code: zsh
% pip install tensorflow # あるいは tensorflow-gpu
code: python
import gym
from gym.spaces import Discrete, Box
from ray import tune
class SimpleCorridor(gym.Env):
def __init__(self, config):
self.cur_pos = 0
self.action_space = Discrete(2)
self.observation_space = Box(0.0, self.end_pos, shape=(1, ))
def reset(self):
self.cur_pos = 0
def step(self, action):
if action == 0 and self.cur_pos > 0:
self.cur_pos -= 1
elif action == 1:
self.cur_pos += 1
done = self.cur_pos >= self.end_pos
tune.run(
"PPO",
config={
"env": SimpleCorridor,
"num_workers": 4,
"env_config": {"corridor_length": 5}})
RaySGD:分散トレーニングラッパー
RaySGDは、分散型ディープラーニング用の軽量ライブラリであり、データ並列トレーニング用のPyTorchおよびTensorFlowネイティブモジュールの薄いラッパーを提供します。
主な機能は次のとおりです。
使いやすさ:
個々のノードを監視することなく、PyTorchのネイティブDistributedDataParallelとTensorFlowのtf.distribute.MirroredStrategyをスケーリングします。
構成可能性:
RaySGDはRay Actor APIの上に構築されており、RLlib、Tune、Ray.Serveなどの既存のRayアプリケーションとのシームレスな統合を可能にします。
スケールアップとスケールダウン:
シングルCPUで開始します。 コードを2行変更するだけで、マルチノード、マルチCPU、またはマルチGPUクラスターにスケールアップします。
実行例
TorchTrainerは、次の方法で開始できます。
code: python
import ray
from ray.util.sgd import TorchTrainer
from ray.util.sgd.torch import TrainingOperator
from ray.util.sgd.torch.examples.train_example import LinearDataset
import torch
from torch.utils.data import DataLoader
class CustomTrainingOperator(TrainingOperator):
def setup(self, config):
# データを読み込む
train_loader = DataLoader(LinearDataset(2, 5), config"batch_size") val_loader = DataLoader(LinearDataset(2, 5), config"batch_size") # モデルの作成
model = torch.nn.Linear(1, 1)
# 最適化アルゴリズムを作成
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
# 損失(loss)の取得(実際のラベルからどのくらい違っていたのか)
loss = torch.nn.MSELoss()
# モデル、最適化アルゴリズム、損失の登録
self.model, self.optimizer, self.criterion = self.register(
models=model,
optimizers=optimizer,
criterion=loss)
# データローダーを登録
self.register_data(train_loader=train_loader,
validation_loader=val_loader)
ray.init()
trainer1 = TorchTrainer(
training_operator_cls=CustomTrainingOperator,
num_workers=2,
use_gpu=False,
config={"batch_size": 64})
stats = trainer1.train()
print(stats)
trainer1.shutdown()
print("success!")
Rayクラスタで分散Scikit-learn /Joblibを実行
Rayは、ローカルプロセスの代わりにRay Actorsを使用してRayバックエンドとしてjoblibを実装することにより、scikit-learnプログラムを分散処理することができます。つまり、scikit-learnを使用する既存のアプリケーションをシングルノードからクラスターに簡単にスケールすることができます。
開始するには、最初にRayをインストールしてから、ray.util.joblibからimport register_rayを使用し、register_ray()を実行します。 これにより、Rayがscikit-learnで使用するjoblibバックエンドとして登録されます。 次に、joblib.parallel_backend( 'ray')を使用して元のscikit-learnコードを実行します。 これにより、ローカルRayクラスターが開始されます。 代わりにマルチノードRayクラスターで実行する手順については、以下の「クラスターでの実行」セクションを参照してください。
code: python
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
digits = load_digits()
param_space = {
'C': np.logspace(-6, 6, 30),
'gamma': np.logspace(-8, 8, 30),
'tol': np.logspace(-4, -1, 30),
}
model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=5, n_iter=300, verbose=10)
import joblib
from ray.util.joblib import register_ray
register_ray()
with joblib.parallel_backend('ray'):
search.fit(digits.data, digits.target)
クラスターで実行
scikit-learnを実行中のRayクラスターに接続するには、RAY_ADDRESS環境変数を設定してヘッドノードのアドレスを指定する必要があります。
joblib.parallel_backend('ray')を呼び出す前に、ray.init()を呼び出して、サポートされている構成オプションを与えて呼び出すことで、Rayを手動で開始することもできます。
注意
環境変数RAY_ADDRESSを設定せず、ray.init(address=<address>) として
Rayにアドレスを指定しない場合、scikit-learnはシングルノードで実行されます。
RayクラスタでDaskを実行
Rayは、Daskスケジューラーを提供し、使い慣れたDaskコレクション(データフレーム、配列)を使用してデータ分析を構築し、Rayクラスター上で計算を実行できるようにします。 このDaskスケジューラーを使用すると、Daskエコシステム全体をRay上で実行することができます。
注意
Rayは現在、オブジェクトのスピルをサポートしていないため、
クラスターメモリよりも大きいデータセットを処理できないことに注意してください。
これは計画された機能です。
スケジューラー
Dask-Rayスケジューラーは、任意の有効なDaskグラフを実行でき、任意のDask .compute()呼び出しで使用できます。 次に例を示します。
code: python
import ray
from ray.util.dask import ray_dask_get
import dask.delayed
import time
# Rayクラスタの起動
ray.init()
# 既存のクラスターに接続している場合は、次のように起動する
# ray.init(address="auto")
@dask.delayed
def inc(x):
time.sleep(1)
return x + 1
@dask.delayed
def add(x, y):
time.sleep(3)
return x + y
x = inc(1)
y = inc(2)
z = add(x, y)
# The Dask scheduler submits the underlying task graph to Ray.
z.compute(scheduler=ray_dask_get)
RayクラスタでDaskを使用する利点
次のようなケースの場合、RayクラスタでDaskを利用する利点があります。
Daskが提供するおなじみのNumPyおよびPandasAPIを使用してデータ分析を作成し、Rayなどの本番環境に対応した分散タスク実行システムで実行する場合。
2つの異なるタスク実行バックエンドを使用せずに、同じアプリケーションでDaskライブラリとRayライブラリを使用する場合。
クラスタランチャーや共有メモリストアなどのRay固有の機能を利用したい場合。
Rayクラスターで実行する場合は、Dask.distributedクライアントを使用しないようにします。純粋にDaskとそのコレクションを使用し、ray_dask_get.compute()呼び出しに渡すだけです。 Rayクラスタを使用するのであれば、ray.init()呼び出しを変更するための指示に従います。
(要確認)
Dask-on-Rayは進行中のプロジェクトであり、Rayを直接使用する場合と同じパフォーマンスを達成することは期待されていません。
コールバック
Daskのカスタムコールバック抽象化はRay固有のコールバックで拡張され、ユーザーがRayタスクの送信と実行のライフサイクルにフックできるようにします。 これらのフックを使用すると、Daskレベルのスケジューラーと、進行状況のレポート、診断、キャッシュなどのタスクのイントロスペクションを簡単に実装できます。
次に、ray_pretaskフックとray_posttaskフックを使用して各タスクの実行時間を測定してログに記録する例を示します。
code: python
from ray.util.dask import RayDaskCallback
from timeit import default_timer as timer
class MyTimerCallback(RayDaskCallback):
def _ray_pretask(self, key, object_refs):
# Rayタスクの開始時に実行される
start_time = timer()
return start_time
def _ray_posttask(self, key, result, pre_state):
# Rayタスクの最後に実行される
execution_time = timer() - pre_state
print(f"Execution time for task {key}: {execution_time}s")
with MyTimerCallback():
# このコンテキスト内のcompute()メソッドの呼び出しは、
# Dask-RayコールバックとしてMyTimerCallback()を取得します。
z.compute(scheduler=ray_dask_get)
次のRay固有のコールバックが提供されています。
ray_presubmit(task、key、deps):Rayタスクを送信する前に実行します。このコールバックがNone以外の値を返す場合、Rayタスクは作成されず、この値がタスクの結果値として使用されます。
ray_postsubmit(task、key、deps、object_ref):Rayタスクを送信した後に実行します。
ray_pretask(key、object_refs):Rayタスク内でDaskタスクを実行する前に実行します。これは、タスクが送信された後、Rayワーカー内で実行されます。このタスクの戻り値は、提供されている場合、ray_posttaskコールバックに渡されます。
ray_posttask(key、result、pre_state):Rayタスク内でDaskタスクを実行した後に実行します。これは、Rayワーカー内で実行されます。このコールバックは、提供されている場合、ray_pretaskコールバックの戻り値を受け取ります。
ray_postsubmit_all(object_refs、dsk):すべてのRayタスクが送信された後に実行します。
ray_finish(result):すべてのRayタスクの実行が終了し、最終結果が返された後に実行します。
これらのコールバック、引数、および戻り値の詳細については、RayDaskCallback .__ init __()<ray.util.dask.callbacks.RayDaskCallback> .__ init __()のドキュメント文字列を参照してください。
独自のコールバックを作成する場合は、RayDaskCallbackを直接使用して、コールバック関数をコンストラクター引数として渡すことができます。
code: python
def my_presubmit_cb(task, key, deps):
print(f"About to submit task {key}!")
with RayDaskCallback(ray_presubmit=my_presubmit_cb):
z.compute(scheduler=ray_dask_get)
または、必要なコールバックメソッドを実装して、サブクラス化することもできます。
code: python
class MyPresubmitCallback(RayDaskCallback):
def _ray_presubmit(self, task, key, deps):
print(f"About to submit task {key}!")
with MyPresubmitCallback():
z.compute(scheduler=ray_dask_get)
複数のコールバックを指定することもできます。
code: python
# MyTimerCallbackとMyPresubmitCallbackの両方のフックが呼び出される
with MyTimerCallback(), MyPresubmitCallback():
z.compute(scheduler=ray_dask_get)
Daskコールバックをアクターと組み合わせると、タスク実行統計のキャプチャや結果のキャッシュなど、ステートフルデータ集約の単純なパターンが得られます。 これは両方を行う例であり、実行時間がユーザー定義のしきい値を超えた場合にタスクの結果をキャッシュします。
code: python
@ray.remote
class SimpleCacheActor:
def __init__(self):
self.cache = {}
def get(self, key):
# Raises KeyError if key isn't in cache.
def put(self, key, value):
class SimpleCacheCallback(RayDaskCallback):
def __init__(self, cache_actor_handle, put_threshold=10):
self.cache_actor = cache_actor_handle
self.put_threshold = put_threshold
def _ray_presubmit(self, task, key, deps):
try:
return ray.get(self.cache_actor.get.remote(str(key)))
except KeyError:
return None
def _ray_pretask(self, key, object_refs):
start_time = timer()
return start_time
def _ray_posttask(self, key, result, pre_state):
execution_time = timer() - pre_state
if execution_time > self.put_threshold:
self.cache_actor.put.remote(str(key), result)
cache_actor = SimpleCacheActor.remote()
cache_callback = SimpleCacheCallback(cache_actor, put_threshold=2)
with cache_callback:
z.compute(scheduler=ray_dask_get)
既存のDaskスケジューラコールバック(start、start_state、pretask、posttask、finish)も利用可能であり、DaskタスクからRayタスクへの変換プロセスをイントロスペクトするために使用できますが、pretaskとposttaskはRayタスクの前後に実行されることに注意してください が送信され、実行されません。その終了は、すべてのRayタスクが送信された後、実行されません。
このコールバックAPIは現在不安定であり、変更される可能性があります。