Pythonチュートリアル:パイプライン処理
https://gyazo.com/153a339305d78fc4fa4850753e4b1594
はじめに
この資料は関数型プログラミングを用いたパイプライン処理について説明するものです。次のことを取り上げています。
関数型プログラミングとパイプラインの用語の理解
関数型プログラミングの利点
パイプライン処理の利点
Pythonでは関数がファーストクラスオブジェクトであることの意味
拡張ライブラリを用いた関数型コードを実装する方法
関数型プログラミング
関数型プログラミング(Functional Programming)は、純粋関数を主に使うプログラミングのスタイルです。純粋関数とは、出力値が入力値のみに依存し、副作用がない関数のことをいいます。関数型プログラミングでは、プログラムはすべて純粋関数の評価で構成されます。
Python では、文法として式 (expression) と 文 (statement) を持つ手続型プログラミング言語です。Pythonの式は、計算を実行して結果を得るような処理を記述するための文法要素で、加減乗除や関数呼び出しなどから構成されます。Python の文は、何らかの動作を行うようにコンピュータへ指示するための文法要素で、条件分岐やループを行う制御文などから構成されいます。
Haskell や Elixir、 Elmなどの関数型言語ではプログラムはたくさんの式で構成され、プログラムそのものもひとつの式と扱われます。手続き型言語ではコンピュータへの指示を文として上から順に並べて書くのに対して、関数型言語では数多く定義した小さな式を組み合わせてプログラムを作ります。
両者のもっとも大きな違いは、関数型プログラミング言語では代入は存在しません。関数の呼び出しは結果を得ること以外なにもしないため、変数はいちど与えられると変更されることはありません。プログラムを実行したときに代入による副作用が発生しないことから、バグの混入を防ぐ効果があります。式はいつでも参照することができ、ロジックを考える負担を減らしてくれ、数学的な問題に対応しやすいといった特徴があります。
パイプライン
パイプライン(Pipeline) とは、複数の処理プログラムを直列に連結し、ある処理プログラムの出力が次の処理プログラムの入力となるようにしたものです。複数のプログラムを並行処理させる場合もあります。
バイオインフォマティックスや機械学習の領域では、データの前処理、学習、分析など工程がいくつか必要になるため、
パイプライン処理でデータを扱う場合が多くなります。
関数型プログラミングとパイプラインは相性がよく、同じ目的にで利用されることがあります。
関数型プログラミングの利点
高レベル
目的の結果を得るために必要なステップを明示的に指定するのではなく、欲しい結果を記述します。ひとつの文は簡潔でありながら多くの処理を包含します。
透明性
関数の動作は入力と出力にのみ依存し、中間的な値を持ちません。そのため、副作用がなく、デバッグが容易になります。
並列化が容易
副作用のないルーチンは、簡単に並列実行することができます。
パイプライン処理の利点
デフォルトではPythonにはパイプ演算子がありません。しかし、関数にドットシタックスを使うことで、メソッドチェインをさせることができ類似の動作をさせることができます。これを使って、中間結果を変数に保存する代わりに、関数型コードでパイプライン実装するとどのようなメリットがあるのでしょうか?
例えば、Pandas の DataFrame を扱うような場合、典型的なデータ処理の「作業」」としては、 次のようなものがあります。
欠損地を取り除く
ある種のフィルタを行う
groupbyで重複を集計する
この処理を、従来の手続言語型のスタイルで記述すると次のようなコードになります。
code: python
df.attribute.ix[df.attribute.str.contains('^\d')] = np.nan df.attribute = df.attribute.astype(int)
df_final = df.groupby('id').max()
これをパイプラインのスタイルで記述したものが、次のコードです。
code: python
df_final = ( df.convert_objects(convert_numeric=True)
.query('attribute != 1')
.groupby('id')
.max() )
圧倒的にコードの可読性が向上していて、理解しやすくなっています。
また、他のカラムを固定したまま、データフレームの1つのカラムに関数を適用する方法も、Pandas 0.16.0 で導入された assign() を使うことで実現できます。
例えば、カラムAに一連の関数を連続的に適用したい場合は次のようになります。
code: python
df.assign(A = lambda x: f1(x.A))
.assign(A = lambda x: f2(x.A))
.assign(A = lambda x: f3(x.A))
こうしたスタイルはPandasのDataFrameだけでなく、SQLAlchemy などのORMでもうまく機能します。
Pythonの関数がファーストクラスオブジェクトで’あることの意味
関数型プログラミングをサポートするためには、そのプログラミング言語の関数が次の2つの能力を持っていると実装が簡単になります。
他の関数を引数として受け取る
別の関数を呼び出し元に返す
幸いにPython の関数はこの2つの能力を持っています。Pythonではすべてがオブジェクトで、関数も例外ではありません。
Pythonでは、関数はファーストクラスオブジェクトです。つまり、関数は文字列や数値のような値と同じ性質を持っています。文字列や数値でできると思ったことは、関数でもできるわけです。
関数がファーストクラスオブジェクトであるということは、次のようなことができることになります。
関数を変数に代入でき、関数が代入された変数を関数として呼び出せる
関数を「値」として扱うことができるので、リストや辞書のような構造体に格納できる
関数の引数に関数を与える
関数の戻り値として関数を返す
関数の内部で関数を定義できる
パイプライン処理のライブラリ
関数型プログラミングを使った処理では、内側の処理の結果が外側に渡されていくため、記述の順序が人間の思考とは逆になるため、処理内容の理解が難しくなる欠点があります。また、多くなる括弧が合致しているか意識してプログラミングする必要に迫られます。パイプライン機能を提供するライブライを利用するとこの問題が解決されます。
パイプライン機能を提供するライブラリは非常に便利で使い勝手がよいものが多く見つかります。
pipe ー Linuxのシェルのようなパイプ構文でパイプラインを記述する。 sspipe ー 複雑な式を一連の単純な呼び出しに分解し、人間の読みやすさを向上させ、括弧が合っているか確認する時間を減らしてくれます。pipe ライブラリとの互換性があります。 pydash ー NodeJSのLodashをPythonで実装したもの。inuxのシェルのようなパイプ構文でパイプラインを記述する。 fnc ー ジェネレータとして動作しジェネレータを返す関数スタイルのパイプラインを記述できる。ジェネレータであるためメモリ効率がよく大規模データに対しても処理することができます。 pypeln 並列データパイプラインを作成するためのシンプルで強力な Python ライブラリ ここでは、取り上げていませんが次のようなライブラリもあります。
Streamz ー 連続したデータの流れを管理するパイプラインの構築を支援します。単純なケースでは簡単に使用できますが、分岐、結合、フローコントロール、フィードバック、バックプレッシャーなどを含む複雑なパイプラインもサポートしています。オプションとして、StreamzはPandasとcuDFの両方のデータフレームと連携して、連続した表形式のデータに対する適切なストリーミング操作を提供することもできます。 Pipe を使ったパイプライン処理
次のコードは pipe を使って、map()とfilter()の場合と比較したものです。
code: python
In 2: # %load c20_using_pipe.py ...: from pipe import where, select
...:
...:
...: # map() と filter() を同時に使用するとわかりにくい
...: v1 = list(map(lambda x: x * 2, filter(lambda x: x % 2 == 0, data)))
...:
...: # pipe を使うとコードがわかりやすくなる
...: v2 = list( data
...: | where(lambda x: x % 2 == 0)
...: | select(lambda x: x * 2) )
...:
Pipeではパイプと呼ぶmap()、filter()に相当する関数が提供されていて、パイプ記号(|)で連結して処理させることができます。
chain() イテラルオブジェクトを展開
chain_with() イテラルオブジェクトを連結
dedup() 重複した値を削除
groupby() リスト内の要素をグループ化
islice()1 リスト内の要素を抜き出す
izip() 指定した長さのタプルを生成
lstrip 文字列の先頭の空白文字を除去
map() イテラブルオブジェクトに関数を適用
permutations() 可能なすべての並べ換えを返す
reverse リストを逆順にする
rstrip 文字列の末尾の空白を除去したコピーを返す
select() イテラブルオブジェクトに関数を適用
skip() 与えた数の要素だけスキップ
skip_while 与えた条件が真となる要素をスキップ
sort() 与えたイテラルオブジェクトをインプレース(リスト自体が変更される)でソー>トしたものを返す
strip() 文字列から空白を除去したコピーを返す
t() - Haskellの:演算子のように右結合を行った結果を返す
tail() 与えられたイテラルオブジェクトの最後の要素から、指示された要素数を返す
take() 与えた数の要素だけ返す
take_while() 与えた条件が真となる要素を返す
tee 標準出力への出力 (デバッグ用途)
transpose() 転置行列を返す
traverse 再帰的にイテラブルシーケンスを展開する
uniq() 連続した重複データを削除
where() イテレート可能な要素をフィルタリング
Pipeクラス 独自のパイプを作成する
SSPipe を使ったパイプライン処理
SSpipe は、 Pipeに触発されて開発されたライブラリで、内部では pipe を使用しています。SSPipeは、一般的なライブラリとの統合や、オブジェクトプレースホルダーpx の概念の導入、pythonの演算子のオーバーライドによって、パイプをファーストクラスオブジェクトにすることで、pipe の利用を容易にすることに焦点を合わせています。 Pipe ライブラリによって実装された既存のパイプはすべてp.<original_name> を通してアクセスでき、SSPipe と互換性を持っています。SSPipeは特定のパイプ関数を実装せず、パイプ関数の実装と命名をPipeに委譲しています。
例えば、Pipe での「フィボナッチの偶数項のうち400万を超えないものの総和を求めよ」を解く例は、sspipeを使って書き直すこと次のようなコードになります。
code: python
In 2: # %load c21_fibonacchi.py ...: from sspipe import p, px
...:
...: def fib():
...: a, b = 0, 1
...: while True:
...: yield a
...: a, b = b, a + b
...:
...: v1 = (fib() | p.where(lambda x: x % 2 == 0)
...: | p.take_while(lambda x: x < 4000000)
...: | p.add())
...:
...: v2 = (fib() | p.where(px % 2 == 0)
...: | p.take_while(px < 4000000)
...: | p.add())
...:
v2 を求めているコードでは、プレースホルダー px を利用していますが、lambda式を消す働きがあることがわかります。
pipe と SSPipe のどちらが良いかということですが、SSpipe は pipe を内部で利用しているため、基本的にはSSPipeを使うことで必要十分だと言えます。pipe モジュールのコアの機能を自身のプロジェクトに取り込みたいといった場合では、pipe モジュールのpipe.py を使用することになるでしょう。
pydash を使ったパイプライン処理
pydash は NodeJSのLodash を Python に移植することから生まれたプロジェクトです。オブジェクト操作で、よりメモリ効率が良く、大規模なデータセットに適したライブラリが必要なら fnc も検討する価値はあるはず。
code: python
In 2: # %load c21_pydash_quickstart.py ...: import pydash as py_
...:
...: # Arrays
...:
...: data = [1, 2, [3, [4, 5, 6, 7]]] ...: v1 = py_.flatten(data)
...: assert v1 == [1, 2, 3, [4, 5, 6, 7]] ...:
...: v2 = py_.flatten_deep(data)
...:
...: # Collections
...: data = [
...: {'name': 'moe', 'age': 40},
...: {'name': 'larry', 'age': 50},
...: ]
...:
...: v3 = py_.map_(data, 'name')
...:
...: # Functions
...: curried = py_.curry(lambda a, b, c: a + b + c)
...: v4 = curried(1, 2)(3)
...: assert v4 == 6
...:
...: # Objects
...: data = {'name': 'moe', 'age': 40}
...: v5 = py_.omit(data, 'age')
...: assert v5 == {'name': 'moe'}
...:
...: # Utilities
...: v6 = py_.times(3, lambda index: index)
...:
...: # Chaining
...: .without(2, 3)
...: .reject(lambda x: x > 1)
...: .value() )
...:
flatten() ネストされているリストを一段階展開
flatten_deep() ネストされているリストを全ての階層で展開
chunk() 与えた要素数のまとめたリストを返す
omit() 辞書の属性を削除する
get() ネストした辞書の属性を取得
find_index() 辞書のリスト内の要素のインデックスを取得
filter_() パターンに合致するオブジェクトを検索
map_() ネストしたオブジェクトの値を取得
times() 与えた関数をN回実行する
chain() 与えたオブジェクトにメソッドチェインを適用したチェーンを作成
plant() チェーンの初期値の置き換え
fnc を使ったパイプライン処理
Fncは、辞書の探索、変換、集計のための多くの便利な関数を提供しています。 code: python
In 1: %load c22_fnc_quickstart.py ...: v1 = fnc.sequences.flatten(data)
...: assert list(v1) == [1, 2, 3, [4, 5, 6, 7]] ...:
...: v2 = fnc.sequences.flattendeep(data)
...:
...: # Collections
...: data = [
...: {'name': 'moe', 'age': 40},
...: {'name': 'larry', 'age': 50},
...: ]
...:
...: v3 = fnc.map('name', data)
...:
...: ## Functions
...: curried = fnc.iteratee(lambda a, b, c: a + b + c)
...: v4 = curried(1, 2, 3)
...: assert v4 == 6
...:
...: # Objects
...: data = {'name': 'moe', 'age': 40}
...: v5 = fnc.mappings.pick('age', data) ...: assert v5 == {'age': 40}
...: v6 = fnc.mappings.omit('age', data) ...: assert v6 == {'name': 'moe'}
...:
...: # Utilities
...: v7 = list()
...: @fnc.retry(attempts=3, delay=0)
...: def do_something(seq):
...: seq.append(len(seq))
...: raise Exceptions('retry count exceeded')
...:
...: try: do_something(v7)
...: except Exception: pass
...:
...: # Compose like Chaining
...: from functools import partial
...: do_without = partial(fnc.sequences.without, 2, 3) ...: do_reject = partial(fnc.sequences.reject, lambda x: x > 1)
...: do_without_reject = fnc.compose(
...: do_without,
...: do_reject
...: )
...: v8 = do_without_reject(data)
...: assert list(v8) == 1 ...:
pypeln を使ったパイプライン処理
Pypeln (発音は pypeline - パイプライン) は、並列データパイプラインを作成するためのシンプルで強力なPythonライブラリです。 主な機能には次のものがあります。
シンプル ー Pypelnは、SparkやDaskのようなフレームワークでは大げさで不自然に感じられる、並列性と並行性を必要とする中規模のデータタスクを解決するために設計されています。
使いやすい ー Pypelnは、通常のPythonコードと互換性のある、使い慣れた関数型APIを提供しています。
柔軟 ー Pypelnは、全く同じAPIでProcesses、Threads、asyncio.Tasksを使ったパイプラインを構築できます。
きめ細かな制御 ー Pypelnでは、パイプラインの各段階で使用されるメモリとCPUのリソースを制御できます。
code: python
In 2: # %load c06_pipe.py ...: import pypeln as pl
...: import time
...: from random import random
...:
...: def slow_add1(x):
...: time.sleep(random()) # <= some slow computation
...: return x + 1
...:
...: def slow_gt3(x):
...: time.sleep(random()) # <= some slow computation
...: return x > 3
...:
...:
...: def main():
...: data = (
...: range(10)
...: | pl.thread.map(slow_add1, workers=3, maxsize=4)
...: | pl.thread.filter(slow_gt3, workers=2)
...: | list
...: )
...: print(data)
...:
...: if __name__ == '__main__':
...: main()
...:
Pypelnではステージと呼ぶは、計算に関するメタ情報のみを含むイテラブルオブジェクトでパイプラインを構成します。ステージには、Process(Multiprocessing)、Thread(threading)、Task(asyncio)、sync(同期処理)のタイプがあり、それぞに同じ関数が用意されていて。問題の用途に応じたステージを選べるようになっています。
concatー各ステージの要素を順次追加し、複数のステージを1つのステージに連結/統合する(順序は保持しない)
eachーデータ中の各要素に対して関数fを実行しますが、ステージ自体は要素を生成しないステージを作成する
filter ー 組み込みfilter関数のように振る舞いますが、並行処理が追加されている
flat_mapー 引数f(あるいは第1引数)に与えた関数をデータ上にマッピングするステージを作成する
from_iterable-ーイテラブルからステージを作成する
mapーデータ上に関数fをマッピングするステージを作成する
orderedーパイプラインのソース iterable (複数可) で作成された順序に基づいて要素をソートするステージを作成する
runー 1つまたは複数のステージを、そのイテレータが要素を使い果たすまで繰り返し処理する
to_iterableーステージからイテラブルを作成する
マルチプロセスやスレッドでの並列処理、非同期処理、同期処理といったデータ処理を簡単に実装できるのは素晴らしい。ただ、プログラムが並列処理の内側で動作する部分コードであるときは、外側のフレームワークとの競合に注意する必要があります。
機械学習の領域でよく使用されるライブラリ
Pipeline クラスが提供されている。このクラスは複数の処理を結合し、あたかも1つの学習器のように学習・推論を行える仕組みを提供します。
半自動データサイエンスのためのPythonライブラリです。Laleは、scikit-learnと互換性のあるパイプラインのアルゴリズムの自動選択とハイパーパラメータのチューニングを、タイプセーフな方法で簡単に行えるようにするもの
分散処理をサポートするライブラリ
Jugは、タスクに分割されたコードを書き、異なるプロセッサで異なるタスクを実行することができます。
現在、2つのバックエンドを持っています。
1つ目は、プロセス間の通信にファイルシステムを使用し、NFS上で正しく動作するので、異なるマシン上のプロセスを調整することができます。
2つ目はredisをベースにしているので、プロセスは共通のredisサーバーに接続する機能だけを必要とします。
Jugはまた、すべての中間結果を、後で取得できるようにバックエンドに保存することを引き受けます。
ParslはPythonのための並列プログラミングライブラリです。ParslはPythonに並列処理をエンコードするためのシンプルでスケーラブル、かつ柔軟な構造を追加します。開発者はPythonの関数にアノテーションを付け、並列実行の機会を指定します。これらのアノテーションされた関数はアプリと呼ばれ、純粋なPython関数や、シーケンシャル、マルチコア(CPU、GPU、アクセラレータなど)、マルチノードMPIなどの外部アプリケーションへのコールを表すことができます。さらにParslは、これらのアプリへの呼び出しをタスクと呼び、共有された入出力データ(Pythonオブジェクトやファイルなど)で接続することができ、Parslはタスクのダイナミックな依存関係グラフを構築することができます。
次の資料も参照してください。
DataFrameをパイプライン処理
データフレームを操作する関数呼び出しを連鎖させることができます。また、pandasのDataFrame.pipeも見てみましょう。
code: python
df = (df.pipe(foo,1)
.pipe(foo,2)
.pipe(foo,3)
参考