Dataform
GCP 版 Datafrom
新規に使ってみる
Dataform Repository 作る
region が全然だけどまあ us-central で
Secret Manager に github の PAT を置く
Dataform Service Account の role
secretAccessor & BigQuery jobUser & User & dataEditor
tag ね
用途とかは tag でつけようかな
table:dataform-bigquery
Dataform BigQuery
location region
database project
schema dataset
["1", "2"]を includes で定義して参照すると [1, 2] になって型が合わない
日本語打つと壊れる?
それ移行が保存されてない
spreadsheet external table
dataform 上からは作れなさそう
あとでためす: type: operation で実行 & type: declaration で参照?
手で作って参照する
これでできそうではある
リポジトリの特定ディレクトリ以下を参照できないか?
気になる
CLI の dataform init や dataform compile のオプションでは渡せる
environment.json でなんかできない?
Incremental Table
基本動作
初回 & refresh なら CREATE OR REPLACE TABLE ...
そうでないなら(incremental なら) INSERT INTO ... になる
uniqueKey 指定があればそのキーで MERGE する
code:merge.sql
MERGE INTO target
USING (
SQLX のクエリ
) AS source
ON target.${uniqueKey} = source.${uniqueKey}
AND ${updatePartitionFilter}
WHEN MATCHED THEN
UPDATE ...
WHEN NOT MATCHED THEN
INSERT ...
VALUES ...
? config.uniqueKey と config.assertion.uniqueKey の違いは? 後者は assertion される?
ちなみに MERGE ... USING (ここ) に WITH ... クエリ書けるのでクエリ分割のために DECLARE やらに置き換えたりは要らない
${when(incremental(), TRUE のとき, FALSE のとき)} のように真偽分けて書ける
例だと TRUE 節のみなことが多い
code:checkpoint.sql
pre_operations {
DECLARE event_timestamp_checkpoint DEFAULT (
${when(incremental(),
SELECT max(event_timestamp) FROM ${self()},
SELECT timestamp("2000-01-01"))}
)
}
? updatePartitionFilter はどう働く?
MERGE 時に target をフィルタする時かな?
code:expand_partition_filter.sql
MERGE ... USING (...) AS S
ON T.${uniqueKey} = S.${uniqueKey}
AND t.${uniqueKey}.${updatePartitionFilter}` -- に展開される
なら左辺になんか式書けないのか?
書けないいいい
updatePartitionFilter: "CAST(minute AS STRING)>= CAST(DATETIME_SUB(CURRENT_DATETIME(), INTERVAL 20 MINUTE) AS STRING)" とか意味なく文字列比較すると
... and T.CAST(minute AS STRING)>= CAST(
いやいや...
一方で単に文字列結合なら悪いことできておもしろいかも
? partition は後から付く? 作成時だけ?
作成時だけ
--full-refresh してもパーティション定義変わってたらだめそう、作り直す必要ある
bigquery error: Cannot replace a table with a different partitioning spec. Instead, DROP the table, and then recreate it. New partitioning spec is ...
なんか DDL で partition 設定しなおせないっけ? できたらテーブル消さず手動オペ挟めばいける
protected: true
full-refresh から保護する
pre_operations / post_operations
pre_operations で declare してスキャン範囲などを決めたりする
? pre_operations で ASSERT や ERROR して失敗させることはできる?
? 事前に self のテーブルを作っておきたいとき、pre_operation でやる必要はある?
dataform 側が最初に BEGIN CREATE SCHEMA IF NOT EXISTS ... しているけど、pre_operation はそれより先か後か? (dataset も作る必要があるか?)
最後に
CREATE OR REPLACE TABLE ... PARTITION BY date_published している
データのテスト
assertion
nonNull / rowConditions / uniqueKey / uniqueKeys
定期実行 & assertion 失敗通知
compile → workflowinvocation → 結果待つ
失敗した理由って分からないのか?
state で失敗したことは分かる
何の assertion にコケたか知りたいのだが
action をクエリすると結果が帰ってきそう
CLI ないので素朴に
workflowInvocationActions から state が FAILED なものを集めて target を組み立ててあげるとよいが...
Workflows でやるのダルすぎるので成功失敗と実行ページの URL を通知するか
これ URL 中の workspace はなんでもいいっぽい
WORKSPACE は - とかで
Cloud Scheduler から実行
Cloud Scheduler の body = Cloud Workflows の引数、と思いきやこれは罠
Cloud Workflows のパラメータと、HTTP で kick する際の HTTP リクエストパラメータは異なる
Cloud Workflows の UI から引数を指定して Cloud Scheduler を作ると、Scheduler上では
code:body.json
{"argument":"{\"foo\":\"bar\"}", "callLogLevel": "CALL_LOG_LEVEL_UNSPECIFIED"}
が作られる
Terraform などで Workflow を kick する Scheduler を作るときはこの構造にする
code:params.tf
body = base64encode(jsonencode({
argument = jsonencode({ gitCommitish = "main" })
callLogLevel = "CALL_LOG_LEVEL_UNSPECIFIED"
}))