Dataform
First steps with your Dataform project | Dataform
Dataformを使ってみる(BigQuery) - Qiita
Best practices for managing dataform projects | Dataform
GCP 版 Datafrom
従来のデータフォームから移行 | Google クラウド
新規に使ってみる
Dataform Repository 作る
region が全然だけどまあ us-central で
Secret Manager に github の PAT を置く
Dataform Service Account の role
Quickstart: Create and execute a SQL workflow  |  Dataform  |  Google Cloud
secretAccessor & BigQuery jobUser & User & dataEditor
tag ね
用途とかは tag でつけようかな
table:dataform-bigquery
Dataform BigQuery
location region
database project
schema dataset
["1", "2"]を includes で定義して参照すると [1, 2] になって型が合わない
日本語打つと壊れる?
それ移行が保存されてない
spreadsheet external table
dataform 上からは作れなさそう
あとでためす: type: operation で実行 & type: declaration で参照?
手で作って参照する
TODO hasOutput データフォーム コア リファレンス | Google クラウド
これでできそうではある
リポジトリの特定ディレクトリ以下を参照できないか?
dataform-co/dataform@main - protos/core.proto#L40-L42
dataform-co/dataform@main - protos/core.proto#L49
気になる
CLI の dataform init や dataform compile のオプションでは渡せる
environment.json でなんかできない?
Incremental Table
Incremental datasets | Dataform
基本動作
初回 & refresh なら CREATE OR REPLACE TABLE ...
そうでないなら(incremental なら) INSERT INTO ... になる
uniqueKey 指定があればそのキーで MERGE する
code:merge.sql
MERGE INTO target
USING (
SQLX のクエリ
) AS source
ON target.${uniqueKey} = source.${uniqueKey}
AND ${updatePartitionFilter}
WHEN MATCHED THEN
UPDATE ...
WHEN NOT MATCHED THEN
INSERT ...
VALUES ...
? config.uniqueKey と config.assertion.uniqueKey の違いは? 後者は assertion される?
Dataform core reference  |  Google Cloud config.uniqueKey は incremental table 用かな?
ちなみに MERGE ... USING (ここ) に WITH ... クエリ書けるのでクエリ分割のために DECLARE やらに置き換えたりは要らない
MERGE の中の WITH と外の WITH ってなんか違う? → BigQuery スキャン量減らす
${when(incremental(), TRUE のとき, FALSE のとき)} のように真偽分けて書ける
例だと TRUE 節のみなことが多い
code:checkpoint.sql
pre_operations {
DECLARE event_timestamp_checkpoint DEFAULT (
${when(incremental(),
SELECT max(event_timestamp) FROM ${self()},
SELECT timestamp("2000-01-01"))}
)
}
Avoid full table scans when ingesting from a partitioned table - Configure an incremental table  |  Dataform  |  Google Cloud
? updatePartitionFilter はどう働く?
MERGE 時に target をフィルタする時かな?
code:expand_partition_filter.sql
MERGE ... USING (...) AS S
ON T.${uniqueKey} = S.${uniqueKey}
AND t.${uniqueKey}.${updatePartitionFilter}` -- に展開される
なら左辺になんか式書けないのか?
書けないいいい
updatePartitionFilter: "CAST(minute AS STRING)>= CAST(DATETIME_SUB(CURRENT_DATETIME(), INTERVAL 20 MINUTE) AS STRING)" とか意味なく文字列比較すると
... and T.CAST(minute AS STRING)>= CAST(
いやいや...
一方で単に文字列結合なら悪いことできておもしろいかも
? partition は後から付く? 作成時だけ?
作成時だけ
--full-refresh してもパーティション定義変わってたらだめそう、作り直す必要ある
bigquery error: Cannot replace a table with a different partitioning spec. Instead, DROP the table, and then recreate it. New partitioning spec is ...
なんか DDL で partition 設定しなおせないっけ? できたらテーブル消さず手動オペ挟めばいける
protected: true
Protect an incremental table from full refresh - Configure an incremental table  |  Dataform  |  Google Cloud
full-refresh から保護する
pre_operations / post_operations
Google BigQuery | Dataform
Dataform を導入してみた話 | 株式会社CAM
pre_operations で declare してスキャン範囲などを決めたりする
? pre_operations で ASSERT や ERROR して失敗させることはできる?
? 事前に self のテーブルを作っておきたいとき、pre_operation でやる必要はある?
dataform 側が最初に BEGIN CREATE SCHEMA IF NOT EXISTS ... しているけど、pre_operation はそれより先か後か? (dataset も作る必要があるか?)
最後に
CREATE OR REPLACE TABLE ... PARTITION BY date_published している
データのテスト
Dataform core reference  |  Google Cloud
assertion
nonNull / rowConditions / uniqueKey / uniqueKeys
定期実行 & assertion 失敗通知
Schedule executions with Workflows and Cloud Scheduler  |  Dataform  |  Google Cloud
Cloud Workflows でやる
compile → workflowinvocation → 結果待つ
失敗した理由って分からないのか?
REST Resource: projects.locations.repositories.workflowInvocations  |  Dataform  |  Google Cloud
state で失敗したことは分かる
何の assertion にコケたか知りたいのだが
Method: projects.locations.repositories.workflowInvocations.query  |  Dataform  |  Google Cloud
action をクエリすると結果が帰ってきそう
CLI ないので素朴に
$ curl -XGET -H 'Content-Type: application/json' -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" 'https://dataform.googleapis.com/v1beta1/projects/hatena-sales/locations/us-central1/repositories/******/workflowInvocations/******:query'
workflowInvocationActions から state が FAILED なものを集めて target を組み立ててあげるとよいが...
Workflows でやるのダルすぎるので成功失敗と実行ページの URL を通知するか
これ URL 中の workspace はなんでもいいっぽい
https://console.cloud.google.com/bigquery/dataform/locations/{LOCATION}/repositories/{REPOSITORY}/workspaces/{WORKSPACE}/workflows/{INVOCATION_ID}
WORKSPACE は - とかで
Cloud Scheduler から実行
Cloud Scheduler の body = Cloud Workflows の引数、と思いきやこれは罠
Cloud Workflows のパラメータと、HTTP で kick する際の HTTP リクエストパラメータは異なる
Cloud Workflows の UI から引数を指定して Cloud Scheduler を作ると、Scheduler上では
code:body.json
{"argument":"{\"foo\":\"bar\"}", "callLogLevel": "CALL_LOG_LEVEL_UNSPECIFIED"}
が作られる
Terraform などで Workflow を kick する Scheduler を作るときはこの構造にする
code:params.tf
body = base64encode(jsonencode({
argument = jsonencode({ gitCommitish = "main" })
callLogLevel = "CALL_LOG_LEVEL_UNSPECIFIED"
}))