Marsを使ってみよう
https://gyazo.com/23ea5640709815ebba457dad9e4e95ba
Mars について
Pythonでデータ分析や機械学習の問題を処理する場合は、テンソル(Tensor) と呼ばれるデータ構造を扱うことがよくあります。Tensorは多次元(N-dimension)の配列として表現することができるものです。Marsは、Numpy、Pandas、Scikit-learnをスケーリングする大規模データ計算のためのテンソルベースの統合フレームワークです。 インストール
Mars は次のようにインストールします。
code: zsh
% pip install pymars
Mars の利用方法
Marsには主に次のコンポーネントがあります。
Mars Tensor
Mars DataFrame
Mars Learn
Mars remote
Mars Tensor
Mars Tensorは、Numpyでおなじみのあるインターフェースを提供します。
code: tensor_sample_numpy.py
import numpy as np
N = 200_000_000
a = np.random.uniform(-1, 1, size=(N, 2))
print((np.linalg.norm(a, axis=1) < 1).sum() * 4 / N)
code: tensor_sample_mars.py
import mars.tensor as mt
N = 200_000_000
a = mt.random.uniform(-1, 1, size=(N, 2))
print(((mt.linalg.norm(a, axis=1) < 1).sum() * 4 / N).execute())
code: ipython
In 1: %time %run tensor_sample_numpy.py 3.14167028
CPU times: user 10.4 s, sys: 4.23 s, total: 14.6 s
Wall time: 19.6 s
In 2: %time %run tensor_sample_mars.py 3.14156578
CPU times: user 16.2 s, sys: 1.99 s, total: 18.2 s
Wall time: 7.43 s
ほとんどコードが変わらないのに、実行時間(Wall time)が2.6倍短くなっています。
Mars DataFrame
Mars Tensorは、Pandasでおなじみのあるインターフェースを提供します。
code: dataframe_sample_pandas.py
mport numpy as np
import pandas as pd
df = pd.DataFrame(
np.random.rand(100000000, 4),
columns=list('abcd'))
print(df.sum())
code: dataframe_sample_mars.py
import mars.tensor as mt
import mars.dataframe as md
df = md.DataFrame(
mt.random.rand(100000000, 4),
columns=list('abcd'))
print(df.sum().execute())
code: ipython
In 1: %time %run dataframe_sample_pandas.py a 5.000317e+07
b 4.999806e+07
c 5.000187e+07
d 4.999695e+07
dtype: float64
CPU times: user 9.81 s, sys: 2.52 s, total: 12.3 s
Wall time: 12.6 s
In 2: %time %run dataframe_sample_mars.py a 5.000124e+07
b 5.000252e+07
c 4.999571e+07
d 5.000160e+07
dtype: float64
CPU times: user 15.8 s, sys: 1.53 s, total: 17.4 s
Wall time: 6.58 s
Mars Learn
Mars Learn は、scikit-learnでおなじみのあるインターフェースを提供します。
code: learn_sample_scikit_learn.py
from sklearn.datasets import make_blobs
from sklearn.decomposition import PCA
X, y = make_blobs(
n_samples=100000000, n_features=3,
random_state=9)
pca = PCA(n_components=3)
pca.fit(X)
print(pca.explained_variance_ratio_)
print(pca.explained_variance_)
code: learn_sample_mars.py
rom mars.learn.datasets import make_blobs
from mars.learn.decomposition import PCA
X, y = make_blobs(
n_samples=100000000, n_features=3,
random_state=9)
pca = PCA(n_components=3)
pca.fit(X)
print(pca.explained_variance_ratio_)
print(pca.explained_variance_)
code: ipython
In 1: %time %run learn_sample_scikit_lear.py CPU times: user 1min 3s, sys: 14.2 s, total: 1min 17s
Wall time: 1min 19s
In 2: %time %run learn_sample_mars.py CPU times: user 2min 15s, sys: 19 s, total: 2min 34s
Wall time: 56.5 s
Mars remote
Mars remoteを使用すると、ユーザーは関数を並行して実行できます。
code: remote_sample_local.py
import numpy as np
def calc_chunk(n, i):
rs = np.random.RandomState(i)
a = rs.uniform(-1, 1, size=(n, 2))
d = np.linalg.norm(a, axis=1)
return (d < 1).sum()
def calc_pi(fs, N):
return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
pi = calc_pi(fs, N)
print(pi)
code: remote_sample_mars.py
import numpy as np
import mars.remote as mr
def calc_chunk(n, i):
rs = np.random.RandomState(i)
a = rs.uniform(-1, 1, size=(n, 2))
d = np.linalg.norm(a, axis=1)
return (d < 1).sum()
def calc_pi(fs, N):
return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
pi = mr.spawn(calc_pi, args=(fs, N))
print(pi.execute().fetch())
code: ipython
In 1: %time %run remote_sample_local.py 3.1416312
CPU times: user 9.64 s, sys: 1.95 s, total: 11.6 s
Wall time: 11.5 s
In 2: %time %run remote_sample_mars.py 3.1416312
CPU times: user 16.2 s, sys: 3.86 s, total: 20.1 s
Wall time: 7.69 s
Mars でスケーリング
ローカルマシンでのスケーリング
スレッドベース
code: python
mport mars.tensor as mt
from mars.session import new_session
a = mt.ones((5, 5), chunk_size=3)
b = a * 4
# ローカルセッションがない場合、
# execute() は最初にデフォルトのセッションを作成します
b.execute()
# または明示的にセッションを作成します
sess = new_session()
b.execute(session=sess) # run b
プロセスベース
Msersは、マルチコアを持つローカルマシンでMarsの分散処理を実行することができます。
それには、まず次のように分散処理をサポートするようにしておきます。
code: zsh
現在のところ、ローカルクラスターモードはLinuxとMacOSでのみ実行できます。
code: python
import mars.tensor as mt
from mars.deploy.local import new_cluster
from mars.session import new_session
cluster = new_cluster()
# 新しいクラスターがセッションを開始し、それをデフォルトとして設定すると、
# 1回の実行がローカルクラスターで実行されます
a = mt.random.rand(10, 10)
a.dot(a.T).execute()
# cluster.sessionは、作成されたセッションです
(a + 1).execute(session=cluster.session)
# ユーザーは明示的にセッションを作成することもできます
# cluster.endpointをnew_session()に渡す必要があります
session2 = new_session(cluster.endpoint)
(a * 2).execute(session=session2)
クラスタでのスケーリング
クラスターシステムのすべてのノードで次のコマンドを実行する必要が’あります。
code: zsh
これにより、クラスターでの分散実行に必要な依存関係がインストールされます。
Mars を使ってクラスタで分散処理をする場合次の3つの構成が必要になります。
最小で1つのスケジューラー
1つのWebサービス
他のノードをワーカー
スケジューラーは、次のコマンドで開始することできます。
code: zsh
mars-scheduler -a <scheduler_ip> -p <scheduler_port>
Webサービスは次のコマンドで開始することができます。
code: zsh
mars-web -a <web_ip> -p <web_port> -s <scheduler_ip>:<scheduler_port>
ワーカーは次のコマンドで開始することができます。
code:zsh
mars-worker -a <worker_ip> -p <worker_port> -s <scheduler_ip>:<scheduler_port>
すべてのMarsサービスが起動してクラスタの準備が整うと、次のようなスクリプトでジョブを分散処理することができます。
code: python
import mars.tensor as mt
import mars.dataframe as md
from mars.session import new_session
a = mt.ones((2000, 2000), chunk_size=200)
b = mt.inner(a, a)
b.execute() # テンソルをクラスタにサブミットする
df = md.DataFrame(a).sum()
df.execute() # データフレームをクラスタにサブミットする
ブラウザーで、URLとしてhttp://<web_ip>:<web_port> を開くと、Mars UIがオープンします。
Mars UI ではワーカーのリソース使用量と、サブミットされたタスクの進行状況を参照調べることができます。
コマンドライン
table: コマンドライン・オプション(共通)
オプション 説明
-a クラスター内の他のプロセスに公開されているアドレスを指定します。
サーバーに複数のIPアドレスがある場合、
またはサービスがVMまたはコンテナー内にデプロイされている場合に役立ちます
-H サービスIPバインディング、デフォルトでは0.0.0.0
-p サービスのポート番号。 デフォルトはランダムなポート番号が使用されます
-s スケジューラーエンドポイントのリスト(コンマで区切り)
ワーカーやWebがスケジューラーを見つける場合、
または複数のスケジューラーを実行する場合に便利です
--log-level ログレベル、デバッグ、情報、警告、エラーの可能性があります
--log-format ログ形式、Python の logging 形式を指定することができます
--log-conf Pythonロギング構成ファイル、デフォルトでは logging.conf
table: コマンドライン・オプション(スケジューラ)
オプション 説明
--nproc プロセスの数。 デフォルトは利用可能なコア数になります
table: コマンドライン・オプション(ワーカー)
オプション 説明
--cpu-procs CPUのプロセス数。 デフォルトは利用可能なコアの数になります
--net-procs ネットワーク転送のプロセス数。 デフォルト値は4
--cuda-device 使用するCUDAデバイスのインデックス。 デフォルトではCPUのみが使用されます
--phy-mem 物理メモリのサイズ
合計メモリのパーセンテージまたはバイト数で指定(例:4gまたは80%)
デフォルトでは物理メモリのサイズが使用されます
--cache-mem 共有メモリのサイズ
合計メモリのパーセンテージまたはバイト数で指定(例:4gまたは80%)
デフォルトでは空きメモリの50%が使用されます
--min-mem ワーカーを開始するための最小空きメモリ
合計メモリのパーセンテージまたはバイト数で指定(例:4gまたは80%)
デフォルトは128mです
--spill-dir MacOSまたはLinuxの場合、コロン(:)で区切られた出力先のディレクトリ
--plasma-dir プラズマストアのディレクトリ。
指定した場合はプラズマストアのサイズはメモリ管理では考慮されません。
メモリチューニング
Mars Worker は、メモリの2つの異なる部分を管理します。
プライベートプロセスメモリ
ワーカープロセス間の共有メモリ
Apache Arrowの plasma_store によって処理される
Mars Workerが起動すると、デフォルトで空きメモリ領域の50%が共有メモリとして使用され、残りはプライベートプロセスメモリとして使用されます。
さらに、Marsは、ワーカーへのメモリ割り当てを制限することができます。
ソフトリミット:メモリ全体の75%
ハードリミット:メモリ全体の90%
これらの構成は、MarsWorkerの起動時にオプションで指定することができます。
--cache-mem:共有メモリのサイズ
--phy-mem:物理メモリサイズ
この2つのオプションの組み合わせでソフトリミットとハードリミットを指定できます。
実行例:共有メモリのサイズを512MB、ワーカーは物理メモリ全体の最大90%を使用する場合
code: zsh
% mars-worker -a localhost -p 9012 -s localhost:9010 \
--cache-mem 512m --phy-mem 90%
MarsをKubernetesクラスタで実行
Marsは、Kubernetesクラスターで実行することもできます。 mars.deploy.kubernetes を使用して、Marsクラスターをセットアップします。
基本的な手順
Marsは、デフォルトでイメージリポジトリ marsproject/mars を使用します。 Marsのリリースバージョンのv0.3.0以降のイメージがあります。 たとえば、バージョン0.3.0のイメージはmarsproject/mars:v0.3.0 を参照します。
MarsはPythonを使用してKubernetesを操作するため、Python用のKubernetesクライアントもローカルにインストールする必要があります。 pipまたはcondaを使用してインストールできます。
code: zsh
% pip install kubernetes
この後、1つのスケジューラー、1つのワーカー、およびkubernetesを使用した1つのWebサービスを使用してMarsクラスターを構築し、その上でジョブを実行することができます。
code: python
from kubernetes import config
from mars.deploy.kubernetes import new_cluster
import mars.tensor as mt
cluster = new_cluster(config.new_client_from_config())
# 新しいクラスターでセッションを開始し、それをデフォルトとして設定すると、
# ローカルクラスターで実行されます
a = mt.random.rand(10, 10)
a.dot(a.T).execute()
# すべてのジョブが実行されたら、クラスターを終了することができます
cluster.stop()
このクラスターを他の場所で使用する場合は、clusterオブジェクトから名前空間とエンドポイントを取得し、別のKubernetesClusterClientを作成することができます。
code: python
# 現在のクラスターから情報を取得する
namespace, endpoint = cluster.namespace, cluster.endpoint
# 新しいクラスタークライアントを作成する
from kubernetes import config
from mars.deploy.kubernetes import KubernetesClusterClient
cluster = KubernetesClusterClient(
config.new_client_from_config(), namespace, endpoint)
クラスターのカスタマイズ
new_cluster()関数は、ユーザーがクラスターを定義するためのいくつかのキーワード引数を提供します。 引数imageを使用してすべてのMarsポッドのイメージを指定するか、引数timeoutを使用してクラスター作成のタイムアウトを指定できます。 クラスターのスケールアップとスケールアウトの引数も利用できます。
table:スケジューラーの引数
引数 説明
scheduler_num クラスター内のスケジューラーの数、デフォルトでは1
scheduler_cpu すべてのスケジューラーのCPUの数
scheduler_mem クラスタ内のスケジューラのメモリサイズ(バイト単位、1gなどのサイズ単位)
scheduler_extra_env スケジューラーで設定を指示する環境変数
table: ワーカーの引数
引数 説明
worker_num Number of workers in the cluster, 1 by default
worker_cpu クラスター内のワーカーの数、デフォルトでは1
worker_mem クラスタ内のワーカーのメモリサイズ(バイト単位、1gなどのサイズ単位)
worker_spill_paths ホスト上のワーカーポッドのスピルパス(spill path)のリスト
worker_cache_mem すべてのワーカーの共有メモリのサイズまたは比率
min_worker_num new_cluster()が返す準備ができているワーカーの最小数。
デフォルトでは worker_num
worker_extra_env ワーカーに設定を指示する環境変数
table: Webサービスの引数
引数 説明
web_num クラスター内のWebサービスの数(デフォルトでは1)
web_cpu すべてのWebサービスのCPUの数
web_mem クラスタ内のWebサービスのメモリサイズ(バイト単位、1gなどのサイズ単位)
web_extra_env Webサービスの設定を指示する環境変数
実行例:1つのスケジューラ、1つのWebサービス、100のワーカーでMarsクラスターを構成し、各ワーカーには4つのコアと16 GBのメモリがあり、95のワーカーの準備ができたら待機を停止する場合
code: python
from kubernetes import config
from mars.deploy.kubernetes import new_cluster
api_client = config.new_client_from_config()
cluster = new_cluster(api_client, scheduler_num=1, web_num=1, worker_num=100,
worker_cpu=4, worker_mem='16g', min_worker_num=95)
Worker の再構成
Marsは、作成されたKubernetesクラスター内のワーカー数のスケールアップまたはスケールダウンをサポートしています。 Kubernetesでクラスターを作成した後、を呼び出すことで、クラスター内のワーカーの数を再スケーリングできます。
code: python
num_of_workers = 20
cluster.rescale_workers(20)
現在、Kubernetesで作成されたMarsクラスターのワーカーを再スケーリングするときに
データが保持されることは保証されていません。
上記の操作を行う場合は、その前にすべてのデータが保存されていることを確認してください。
実装の詳細
new_cluster()が呼び出されると、ロール、ロールバインディング、ポッド、サービスを含むすべてのオブジェクトに対して独立した名前空間が作成されます。ユーザーがサービスを破棄すると、名前空間全体が破棄されます。
スケジューラー、ワーカー、およびWebサービスは、レプリケーションコントローラーを使用して作成されます。サービスは、デフォルトのサービスアカウントを介してKubernetes APIに直接アクセスすることにより、スケジューラを検出します。ポッドアドレスとその準備状況は、開始するかどうかを決定するためにワーカーとWebサービスによって読み取られます。その間、クライアントはすべてのポッドのステータスを読み取り、すべてのスケジューラー、Webサービス、および少なくともmin_worker_numの数のワーカーの準備ができているかどうかを確認します。
Marsサービスの準備は、ポッドステータスを介して結果を取得できる準備プローブによって決定されます。スケジューラーとワーカーの場合、サービスが開始されると、ReadinessActorがサービスに作成され、プローブがそれを検出できます。 Webサービスの場合、Webポートが検出されます。
デフォルトのサービスアカウントにはKubernetesAPIでポッドを読み取る権限がないため、RBAC APIを使用してポッドを読み取り、監視する機能を備えたロールを作成し、レプリケーションコントローラーを作成する前に名前空間内のデフォルトのサービスアカウントにバインドします。これにより、Marsコンテナは他のコンテナのステータスを検出できます。
MarsはKubernetesサービスを使用してサービスを公開します。現在、NodePortモードのみがサポートされており、MarsはエンドポイントとしてWebサービスをホストしているホストを探します。 LoadBalancerモードはまだサポートされていません。
Mars を Rayクラスタで実行
Marsは、Rayクラスターを使用してプログラムを簡単にスケーリングすることができます。
RayクラスターでMarsジョブを実行するのは簡単です。 mars.session からnew_sessionをインポートし、new_session(backend='ray').as_default()を実行します。 これで、デフォルトセッションとしてRayのMarsセッションが作成され、すべてのMarsタスクがRayにサブミットされるようになります。new_session(backend='ray', address=<address>, num_cpus=<num_cpus>) のようにMarsセッションを作成すると、与えた引数が ray.init() に渡されます。
code: python
from mars.session import new_session
ray_session = new_session(backend='ray').as_default()
import mars.dataframe as md
import mars.tensor as mt
t = mt.random.rand(100, 4, chunk_size=30)
df = md.DataFrame(t, columns=list('abcd'))
print(df.describe().execute())
Rayクラスタのモニタリング
Ray クラスタはタスクの監視を支援するため、クラスタやアプリケーションをモニタリングする機能を提供しています。
事前に設定されたRayのシステムレベルのメトリックを収集
Prometheus形式でメトリックを公開
Prometheusメトリックタイプに似たカスタムメトリックAPIのサポート
メトリックにアクセスするためのエンドポイントをPrometheusエンドポイントと呼びます
ここでは、Prometheusを使用してこれらのメトリックにアクセスする方法について説明します。
シングルノード
Rayは、メトリックをPrometheus形式で公開します。 これにより、Prometheusを使用して簡単にそれらを取得することができます。
ray startを実行してヘッドノードにメトリックエクスポートポートを割り当て、メトリックを公開しましょう。
code: zsh
% ray start --head --metrics-export-port=8080
これで、Prometheusを使用してRayのメトリックをスクレイプできます。
code: zsh
% wget ${BASEURL}/prometheus-*.*-amd64.tar.gz
% tar xvfz prometheus-*.tar.gz
% cd prometheus-*
Prometheusの構成ファイルを変更して、Prometheusエンドポイントからメトリックを取得してみましょう。
code: prometheus.yml
global:
scrape_interval: 5s
evaluation_interval: 5s
scrape_configs:
- job_name: prometheus
static_configs:
この構成ファイルを与えてprometheusを起動します。
code: zsh
% ./prometheus --config.file=./prometheus.yml
マルチノード
次に、Rayクラスターからメトリックをインポートする方法について説明します。
Rayは、ノードごとにメトリックエージェントを実行します。 各メトリックエージェントは、ローカルノードからメトリックを収集し、Prometheus形式で公開します。 次に、各エンドポイントをスクレイプしてRayのメトリックにアクセスできます。
ヘッドノードで次のコマンドを実行して、ヘッドノードにメトリックエクスポートポートを割り当てて、ray を起動します。
code: zsh
% ray start --head --metrics-export-port=8080
ワーカーノードで次のコマンドを実行します。
code: zsh
ray.nodes() を使用してメトリックエージェントのURLを取得できるようになります。
code: python
# このスクリプトはヘッドノードで実行します。
import ray
ray.init(address='auto')
from pprint import pprint
pprint(ray.nodes())
"""
[{'Alive': True,
'MetricsExportPort': 8080,
'NodeID': '2f480984702a22556b90566bdac818a4a771e69a',
'NodeManagerAddress': '192.168.1.82',
'NodeManagerHostname': 'host2.attlocal.net',
'NodeManagerPort': 61760,
'ObjectManagerPort': 61454,
'ObjectStoreSocketName': '/tmp/ray/session_2020-08-04_18-18-16_481195_34255/sockets/plasma_store',
'RayletSocketName': '/tmp/ray/session_2020-08-04_18-18-16_481195_34255/sockets/raylet',
'Resources': {'CPU': 1.0,
'memory': 123.0,
'node:192.168.1.82': 1.0,
'object_store_memory': 2.0},
'alive': True},
{'Alive': True,
'MetricsExportPort': 8080,
'NodeID': 'ce6f30a7e2ef58c8a6893b3df171bcd464b33c77',
'NodeManagerAddress': '192.168.1.82',
'NodeManagerHostname': 'host1.attlocal.net',
'NodeManagerPort': 62052,
'ObjectManagerPort': 61468,
'ObjectStoreSocketName': '/tmp/ray/session_2020-08-04_18-18-16_481195_34255/sockets/plasma_store.1',
'RayletSocketName': '/tmp/ray/session_2020-08-04_18-18-16_481195_34255/sockets/raylet.1',
'Resources': {'CPU': 1.0,
'memory': 134.0,
'node:192.168.1.82': 1.0,
'object_store_memory': 2.0},
'alive': True}]
"""
次に、クラスター内のすべてのノードから[NodeManagerAddress]:[MetricsExportPort] からメトリックを読み取るようにプロメテウスをセットアップします。 このプロセスを自動化したい場合は、オプションで構成ファイルあたえて promethus を起動することで、サービスを自動検出します(ファイルベースのサービス検出)。 ray.nodes()を使用してすべてのエンドポイントを簡単に取得できます
クラスタランチャー
Rayクラスターランチャーを使用する場合、クラスターがスケールアップおよびスケールダウンするため、一般的なノードのIPアドレスが変更されます。 この場合、Prometheusのファイルベースのサービス検出を使用できます。
サービス検出
Rayは、ヘッドノードにPrometheusサービス検出ファイルを自動生成して、メトリックエージェントのサービス検出を支援します。 これにより、自動スケーリングクラスターの各ノードですべてのメトリックを簡単に取得できます。 これを実現する方法をウォークスルーしましょう。
サービス検出ファイルは、ヘッドノードで生成されます。 ヘッドノードは、ray start –headまたはray.init()を実行して開始したノードであることに注意してください。
ヘッドノード内で、Rayのtemp_dirを確認してください。 デフォルトでは、/tmp.ray。 ファイル prom_metrics_service_discovery.json を見つけることができるはずです。 Rayは、クラスター内のすべてのメトリックエージェントのアドレスをこのファイルに定期的に更新します。
ここで、Prometheus構成を変更して、サービス検出のためにファイルをスクレイプします。
code: prometheus.yaml
# Prometheus構成ファイル
# グローバル設定
global:
scrape_interval: 2s
evaluation_interval: 2s
# スクレープするエンドポイントを1つだけ含むスクレイプ構成:
scrape_configs:
- job_name: 'ray'
file_sd_configs:
- files:
- '/tmp/ray/prom_metrics_service_discovery.json'
Prometheusは、ファイルの内容が変更されていることを自動的に検出し、Rayによって生成されたサービス検出ファイルに基づいてスクレイプしたアドレスを更新します。
カスタムメトリックス
Rayは、開発者がアプリケーションを可視化できるようにするカスタムメトリックAPIをサポートしています。
まとめ
Mars は次の環境で実行することができます。
シングルノード
マルチノード
kubernatesクラスタ
Rayクラスタ