Dataflow
https://gyazo.com/c081e6183b937a2d146bdcb1c2886b12
Links
結局使ってない
入門する
なんかめんどそう
| この縦棒がピーってなるのがビームってこと??
pip install "apache-beam[gcp]"
実行しておーできた
なんかしれっと Cloud Storage のファイル読んで処理してローカルに書かれる
Guide
Pipeline が1つ処理の流れ全体
PCollection が1つのステップの入力と出力
PTransform が処理するステップ、PCollecction が入力と出力
PCollection を作る
beam.io.ReadFromText でファイルから読む
beam.Create([...]) で作る
Transform
| pipe operator で適用する
[Output PCollection] = [Input PCollection] | [Transform]
PCollection は immutable
code:split
で分岐する
コアの Transofrm
code:cores
ParDo
GroupByKey
CoGroupByKey
Combine
Flatten
Partition
ParDo は PCollection の要素(element) に beam.DoFn を実行する
beam.DoFn を継承したクラスの process メソッド
inline に lambda を渡すこともできる
code:dofn.py
class ComputeWordLengthFn(beam.DoFn):
def process(self, element):
# or
word_lengths = words | beam.FlatMap(lambda word: len(word)) ParDo のライフサイクル図
状態持たなければあまり気にしなくて良さそうかな
https://beam.apache.org/images/dofn-sequence-diagram.svg
GroupByKey
key/value のコレクションの Reduction
non-global windowing or aggregation trigger
unbounded な PCollection を使う場合ウィンドウや集約を定義しないといけない
...
wordcount 読みながらなんか書いてみる
parse_known_args はこれ、input & output 以外の引数を PipelineOptions にわたす save_main_session はよくわからない
ReadFromText に gs:// 渡したら自然と読んでくれる
公開じゃないバケットは認証情報とかを PipelineOptions にわたすのかな?
glob とかも使えるの? gs://bucket/hoge/logs/* みたいな
"hoge" >> これがステップ名?
ビーム | でピーってつなげるのが pipleline をつなげる?
しゃらくせえDSL
beam.ParDo(...)
処理登録するやつ
単語ごとに split
$ワード: 1 に マップ
キーごとに sum
のステップでカウントしてる
どういう書き方したらパフォーマンスでるのかわからん
save_main_session とかもわからないな
グローバルな空間の定義を参照するときに
複数のファイル入力して1つにまとめるのかファイル単位で出るのかとかも気になる
なんかよしなになる
pip install
$ pip install wheel 'apache-beamgcp' ローカルで実行
てんぷれ
input & output は毎回自分の Option クラス作らないといけないものなのか
code:pipeline.py
class MyOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
dest='input',
required=True)
parser.add_argument(
'--output',
dest='output',
required=True)
def run(argv=None, save_main_session=True):
options = MyOption(argv)
with beam.Pipeline(options=options) as p:
(
p
| 'Input' >> beam.io.ReadFromText(options.input)
| 'Output' >> beam.io.WriteToText(options.output)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
code:local
python -m my_pipeline.py --input=logs.json --output=outputs
Dataflow で実行
code:run
python -m apache_beam.examples.wordcount \
--region $REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://$BUCKET/wordcount/outputs \
--runner DataflowRunner \
--project $PROJECT \
--temp_location gs://$BUCKET/tmp/
https://gyazo.com/f9107e4ea15c7d71ef70ae197d6aeb75
おお〜
インスタンスの起動に3分ぐらいかかる
思考停止で --save_main_session つけてもいいかも
定義によっちゃパフォーマンスに悪そうだけど
20GB ぐらい 23分
なんか write こういう感じなのか気になる、できたやつから適当に書いてくれていいけど一通り終わってから書いていそう
NameError
NameError を処理するにはどうすればよいですか?
global にある値を参照している
PipelineOptions の save_main_session を True にする
実行時に --save_main_session して PipelineOptions に食わせる
import が原因なら DoFn 内で import する
ParDo は Iterable な返り値を期待する
python では文字列は iterable なので、単に文字列を返すとバラバラになる
例えば NDJSON を加工、@timestamp を timestamp にキーを rename したい
code:rename
def rename_timestamp(element):
data = json.loads(element)
return json.dumps(data)
with beam.Pipeline(options=pipeline_options) as p:
(
p
| 'Input' >> beam.Create(samples)
| 'Rename @timestamp' >> beam.ParDo(RenameTimestampField())
| 'Output' >> beam.io.WriteToText('./output')
)
実行すると1文字ごとに改行されて output が出てくる
return [json.dumps(data)] のようにするとよい
ファイル名を手に入れる
glob で読んだ際でもファイル名ごとに出力したいような場合
Stackoverflow には色々実装入れてる例が書いてるけど
コードみると beam.io.textio.ReadFromTextWithFilename がある、タプルで (fielname, line) がやってくる
同様に beam.io.fileio.WriteToFiles で書く先をデータに基づいて変えられる
from apache_beam.io import fileio
でもデータに入ってない値を使えないし、なんか入力のファイル名をそのまま引っ張り回したいけどどうしたらいいんだろ? データに含めてしまう & 出力時に消したい
encode/decode が非対称な Coder 作ると分散処理できなさそう
分散処理するものでファイル名制御したいというのがまずいまいちなのは分かるが...
複数の Output
pvalue.TaggedOutput を使う?
単一の Output にまとめる
beam.io.WriteToText(..., num_shards=1) にする?
外部パッケージを使う
行を捨てるにはどうする?
ParDo が実行する関数から空の return する、None 返しても良さそう
ParDo & Map
Dataflow Prime
--dataflow_service_options=enable_prime