ワークフローツールDagsterを使ってみよう
https://gyazo.com/b5362a539d1df37f786f49b92a60f646
書きかけ...
Dagsterは、データオーケストレーターです。ソリッドと呼ばれる論理的なコンポーネント間のデータフローをパイプライン(有向非巡回グラフ(DAG: Directed Acyclic Graph)に定義することができます。これらのパイプラインはローカルに開発でき、どこでも実行できます。
インストール
dagster は pip コマンドでインストールすることができます。
code: bash
$ pip install dagster
dagstar の使用方法概要
細かなことは後で説明するとして、どんな具合に定義、実行するのかを見てみましょう。
パイプラインの作成
それでは、最初のパイプラインを記述してみましょう。
code: 01_hello_world.py
from dagster import pipeline, solid
@solid
def get_name():
return "dagster"
@solid
def hello(context, name: str):
context.log.info(f"Hello, {name}!")
@pipeline
def hello_pipeline():
hello(get_name())
タスクとなる関数に@solidでデコレートしていることと、パイプラインとなる関数に @pipelineでデコレートしているだけです。
dagster ではパイプラインの実行については、3つの方法があります。
Dagster CLI: dagster コマンドで実行
Dagster Python API: PythonからAPI呼び出しで実行
Dagit:Webアプリケーション (pip install dgaitが追加インストールが必要)
まず、コマンドラインツール dagster での実行方法を説明することにします。
引数なしで dagster を実行すると簡単な使用方法が表示されます。
code: bash
$ dagster
CLI tools for working with Dagster.
Options:
-v, --version Show the version and exit.
-h, --help Show this message and exit.
Commands:
api INTERNAL These commands are intended to support internal... asset Commands for working with Dagster assets.
debug Commands for debugging Dagster pipeline runs.
instance Commands for working with the current Dagster instance.
new-project Create a new Dagster repository and generate boilerplate...
pipeline Commands for working with Dagster pipelines.
run Commands for working with Dagster pipeline runs.
schedule Commands for working with Dagster schedules.
sensor Commands for working with Dagster sensors.
パイプラインを実行してみたいので、 pipeline の使用方法を調べてみます。
code: bash
$ dagster pipeline -h
Commands for working with Dagster pipelines.
Options:
-h, --help Show this message and exit.
Commands:
backfill Backfill a partitioned pipeline.
execute Execute a pipeline.
launch Launch a pipeline using the run launcher configured on...
list List the pipelines in a repository.
list_versions Display the freshness of memoized results for the given...
print Print a pipeline.
scaffold_config Scaffold the config for a pipeline.
サブコマンドのrunは実行結果の管理を行うためのものです。
code: bash
$ dagster run -h
Commands for working with Dagster pipeline runs.
Options:
-h, --help Show this message and exit.
Commands:
delete Delete a run by id and its associated event logs.
list List the runs in the current Dagster instance.
wipe Eliminate all run history and event logs.
ソリッド
ソリッド(Solid)は、dgastar のタスクの機能的な単位です。ソリッドの役割は、入力を読み込み、アクションを実行し、出力を出すことです。複数のソリッドを接続して、パイプライン(Pipeline)を作ることができます。
https://gyazo.com/ad03baf6615b1c5bd790c905126aaca7
ソリッドは、処理を定義するために使用されます。ソリッドは後でパイプラインに組み立てることができます。ソリッドは通常、1つの特定のアクションを実行するもので、バッチ処理に使用されます。たとえば、以下のような場合にソリッドを使用できます。
他のデータセットからデータセットを抽出する。
データベースのクエリを実行する。
リモートクラスタでSparkジョブを開始する。
APIにクエリを実行し、その結果をデータウェアハウスに保存する。
メールやSlackのメッセージを送信する。
デフォルトでは、パイプライン内のすべてのソリッドが同じプロセスで実行されます。本番環境では、Dagsterは通常、各ソリッドがそれぞれのプロセスで実行されるように設定されています。
ソリッドにはいくつかの重要な特性があります。
入力と出力です。入力と出力:ソリッドには定義された入力と出力があり、これらはオプションで型付けすることができます。これらの型はランタイムに検証されます。
構成可能。Solidは強く型付けされた設定システムを使って設定することができる。
依存性。ソリッドの入力は、他のソリッドからの出力に依存できます。ソリッドは、その入力がすべて正常に解決されるまで実行されません。依存関係の構造は、Pipeline(パイプライン)を使用して定義されます。
イベントストリームの放出。ソリッドは、AssetMaterializationsのような構造化されたイベントのストリームを発することができます。これらのイベントは、DagsterのUIツールであるDagitで見ることができます。
個別にテストが可能。
ソリッドの定義
ソリッドを定義するには、@solid デコレータを使用します。デコレーションされた関数は compute_fn と呼ばれます。
参考