AWS Glueチートシート
色々ネット上の情報が古かったり、この界隈の情報に疎かったのでメモ
用語集
PySpark: PythonでSpark APIを操作するクライアント Pandas: Sparkに似たAPIを持つデータ分析Pythonのライブラリ AWS Glue: SparkをベースにS3のデータや外部のDBのテーブルを解析してデータカタログ(スキーマなどのメタ情報のみ)を登録し、ジョブからカタログのテーブルを名前を付けて参照出来るようにしたり、PythonもしくはScalaで書かれたジョブを実行してSparkを操作しS3やRDSなどのデータソースと連携して集計をすることが出来るサービス. 特定ジョブをトリガーとしたジョブの起動やジョブのスケジューリング、ETLをサポートするクラスなどが提供される. AWS Batchとの違い: AWS BatchはEC2, ECSをベースにコンピューティングリソースをオンデマンドで提供するサービス. ジョブ実行用のDockerイメージを作成し、パラメータを渡して実行することが出来る. 起動までに数分かかる. バッチ処理が終わったあとは自動的にインスタンスがshutdownされるので余計な追加料金はかからない. Sparkのようなライブラリはないので何かライブラリを利用する場合はそれ用のイメージを利用するなどして自前で用意する必要がある. イメージは何でも利用出来るので自由度は高い.
AWS Glueはある程度の規模のデータ群に対するETL処理に最適化している. 通常のジョブは10分単位での課金. Pythonシェルは1分単位での課金だがSparkのAPIを直接叩ける訳ではない.
DataFrame: Apache Sparkにおける行列の集合を扱うクラス。DataFrameの中にRowクラスがリストで含まれている
DynamicFrame: AWS GlueにおけるDataFrameを変換したり、Mapしたり出来る拡張クラス
初期化
code:python
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
入力(Extract)
ローカルのCSVからDataFrameを生成
code:python
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('~/201906142304.csv')
df.createOrReplaceTempView("users")
new_df = sqlContext.sql("select * from users")
CSV形式のファイルをS3から読み取る
code:python
AWS Glueでカタログから読み込み
code:python
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
dyf = glueContext.create_dynamic_frame.from_catalog(database="db_name", table_name="table_name")
push_down_predicateを使ってS3から件数を絞り込んで読み込み
code:python
dyf = glueContext.create_dynamic_frame.from_catalog(database="db_name", table_name="table_name", push_down_predicate = "(year == '2019' and month == '06' and day == '18' and hour == '10')")
push_down_predicateで範囲検索
code:python
datasource0 = glueContext.create_dynamic_frame_from_options("s3", {'paths': "s3://targetparquet/"}, format="parquet", push_down_predicate = "(year == '2019' and month >= '06' and day == '18' and hour between '11' and '18')", transformation_ctx = "datasource0") push_down_predicateではSpark SQLのクエリが使えるのでBetweenや範囲検索が出来る
S3からparquet形式のファイルを読み取り(+ Push Downで絞り込み)
code:python
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
result = sqlContext.read.parquet("s3://target-bucket/path/to/directory").where("target_date between '2017-06-29 00:00:00' and '2017-08-27'")
Delta Lakeを使う
オープンソースのSpark拡張ストレージレイヤー
DataFrameのMergeや履歴の保持などデータレイクを扱う上で便利な機能が揃っている
例えばすでに存在するデータとのMERGEなどはデフォルトのSparkではUnionなどで時間がかかるがDelta LakeだとMERGE INTO文で簡単にBatch Upsertが出来る
Apache Spark 2.4.2以上で動作。2019-07-20現在 AWS Glue はSpark 2.2.1なのでまだ使えない
変換(Transform)
DataFrameに変換
DataFrameとは: PandasやSparkで使えるクラス. テーブルのようなデータ構造を保持し、selectやquery, filter, mapなど色々便利なデータ操作APIが備わっている。PandasとSparkとでは微妙にAPIが違うので注意
code:python
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
dyf = glueContext.create_dynamic_frame.from_catalog(database="db_name", table_name="table_name")
df = dyf.toDF()
DynamicFrame
AWS Glue特有のクラス
データカタログから読み込んだり、書き出したりする時の形式. DynamicFrameにもデータ操作系のAPIがあるのでこちらで操作してもいい.
DataFrameをSQLクエリで操作する (AWS Glue + PySpark)
code:python
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
dyf = glueContext.create_dynamic_frame.from_catalog(database="db_name", table_name="table_name")
df = dyf.toDF()
# users viewを作成. このviewに対してクエリを投げることが出来る
df.createOrReplaceTempView("users")
res_df = spark.sql("select * from users where name = 'foo'")
result = DynamicFrame.fromDF(res_df)
glueContext.write_dynamic_frame.from_options(frame = result,
connection_type = "s3",
connection_options = {"path": "s3://glue-sample-target/output-dir/users"},
format = "parquet")
Spark SQLでウィンドウ関数を使う
code:python
sqlContext.sql("select distinct(username), target_date, sum(amount) over (partition by username, target_date)
as sum_amount from user_amounts order by target_date desc")
タイムゾーン変換]
code:python
# PySpark APIの場合
from pyspark.sql.functions import to_utc_timestamp
df.withColumn("time", to_utc_timestamp("time", "JST"))
# Spark SQLの場合
sqlContext.sql("SELECT to_utc_timestamp('time', 'JST') from ~~~")
DataFrameのカラムの型を変換する
例: created_at カラムと updated_at カラムの型をtimestamp型からstring型に変換する
code:python
UDF(User Defined Function)を登録する
UDFを登録すると自前の関数をSpark SQLで使えるようになる
code:python
def double_x(x):
return x * x
sqlContext.udf.register("double_x", double_x)
sqlContext.sql("select double_x(3)")
出力(Load)
S3にParquet形式で出力する
formatオプションで parquet を指定する
code:python
dyf.write("s3", {"path": "s3://outputbuket/destination"}, format="parquet")
Partitionを付け加えて出力する
code:python
dyf.write("s3", {"path": "s3://outputbucket/desitination", "partitionKeys": "year", "month", "day"}, format="parquet") partitionKeys でDataFrameのRowに含まれるカラムを指定する
指定すると year=2019/month=06/day=11 のようなHive形式でディレクトリが作成される
Partitioningしておくと、データロード時にpush_down_predicate オプションで絞り込みをすることでデータ処理時間を短縮出来る push_down_predicate = (year == '2018')
AuroraからS3にテーブルを出力する
code:SQL
SELECT * FROM users INTO OUTFILE S3 's3-us-west-2://aurora-select-into-s3-pdx/sample_employee_data'
デフォルトだと出力先のディレクトリに無圧縮でTAB区切りのCSV形式でフラットに出力される. サイズが大きい場合は6GBごとぐらいに区切られる
JDBCで任意のDBに書き込み
code:python
df.write.mode('overwrite').jdbc(url="jdbc:mysql://localhost:3306/dbname", table="users_test", properties={ 'user' : 'root', 'password' : 'root'})
overwriteは対象のテーブルをDROPしてDataFrameで全て置き換える。ちょっと言葉の動作イメージと異なるのが気持ち悪い。
writeのクエリがSyntaxエラーだったりランタイムエラーを吐くと、テーブルがDROPされた状態で存在しない状態になるので注意。
AWS GlueでDFからTableを作成しSpark SQLクエリを投げる
code:python
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
glueContext = GlueContext(SparkContext.getOrCreate())
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
DyF = glueContext.create_dynamic_frame.from_catalog(database="{{database}}", table_name="{{table_name}}")
df = DyF.toDF()
df.registerTempTable('{{name}}')
df = sqlContext.sql('{{your select query with table name that you used for temp table above}}
FirehoseのJSONからParquet形式に変換
code:python
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
glueContext = GlueContext(SparkContext.getOrCreate())
dyf = glueContext.create_dynamic_frame.from_catalog(database="db_name", table_name="table_name", push_down_predicate = "(year == '2019' and month == '06' and day == '18' and hour == '10')")
glueContext.write_dynamic_frame.from_options(frame = dyf, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://target_bucket/output", "partitionKeys": "year", "month", "day", "hour"}) 注意: write_dynamic_frame.from_options は二度実行すると上書きせずに同じ内容の異なるファイルを出力するのでレコードが重複する
なので再度実行する場合は保存先対象のディレクトリにオブジェクトを消すなどして無いことを確認する
あとDataFrameのwrite.mode("overwrite")は対象ディレクトリを一度全て削除して与えられたDataFrameを書き込む動作になるので既存ファイルを削除したくない場合は使ってはいけない
MySQLからPyMySQLでレコードを取ってきてDataFrameに変換
code:python
import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
db = MySQLdb.connect("URL", "USERNAME", "PASSWORD", "DATABASE")
cursor = db.cursor()
cursor.execute("SELECT * FROM myStagingTable")
res = cursor.fetchall()
rdd = sc.parallelize(res)
cursor.execute("SHOW COLUMNS FROM myStagingTable")
columns = cursor.fetchall()
columns = [c0 for c in columns] df = rdd.toDF(columns, sampleRatio=0.2)
db.close()
JDBCでDBからレコードを取ってきてDataFrameにする前にWHERE等で条件を絞り込む
DBの全件はいらない場合などはあらかじめWHEREで条件を絞りこめる
code:python
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# whereでFetchしてくるDataFrameを絞り込む
df = sqlContext.read.format("jdbc")
.option("numPartitions", 8)
.option("url", "jdbc:mysql://localhost:3306/dbname")
.option("user", "root").option("password", "root")
.option("dbtable", "(select * from users where username = 'foobar') as tmp")
.option("pushDownPredicate", True).load()
# 件数
df.count()
# 20件表示
df.show()
解説: Sparkはdbtableオプションにクエリを書けるがクエリの末尾にWHERE文で1 = 0というクエリが勝手に足されて通常のクエリではSyntaxエラーになるので、サブクエリとしてtmpテーブルとしてaliasをつけて最終的に(select * from users where username = 'foobar') as tmp where 1 = 0; というクエリが発行されるようにする
Value '0000-00-00' can not be represented as java.sql.Date というエラーが出る
JDBCの接続文字列に ?zeroDateTimeBehavior=convertToNull をパラメータとして足す
code:python
df = sqlContext.read.format("jdbc")
.option("numPartitions", 8)
.option("url", "jdbc:mysql://localhost:3306/dbname?zeroDateTimeBehavior=convertToNull")
.option("user", "root").option("password", "root")
.option("dbtable", "(select * from users where username = 'foobar') as tmp")
.option("pushDownPredicate", True).load()
DataFrameのFilterに複数条件を指定する
|, & でorやand条件を表現出来る
code:python
異常に遅い場合はパーティションの状態を調べる
DataFrameのpartitionが1個しかないと直列にしか処理出来ないためパフォーマンスに重大な影響を及ぼす
code:python
dataframe.rdd.getNumPartitions()
でパーティション数を確認し少なすぎる場合は、OrderByをするなどしてデータを整列するとパーティショニングされる場合がある
複雑な条件の場合はSpark SQLよりPySpark APIの方が楽な場合がある
SQLで書くと条件がネストする場合複雑になる場合がある
code:python
SELECT
CASE WHEN col1 == 1 THEN
CASE WHEN col2 == 2 THEN
3
END
END as price
その場合はPySpark側でAPIを操作すると楽になる場合がある
code:python
col2 = when(
df.col2 == 2,
3
)
col1_col2 = when(
df.col1 == 1,
col2
)
df.withColumn("price", col1_col2)
このように条件を変数に入れて再利用やネストが出来るので可読性が上がる
DataFrameの差分を取得する
code:python
intersect_df = dev_df.intersect(prod_df)
dev_df.subtract(intersect_df).show(vertical=True)
タイムゾーンを設定
Parquet形式の場合Timestamp型のカラムにparquet形式で出力したマシンのタイムゾーンがデフォルトで付加されるため、例えばリモートのAWS Glue(デフォルトUTC)で処理した後にローカルに出力したファイルを持ってきてSparkで確認する時にタイムゾーンがJSTだと9時間ずれて表示される場合がある
これを避けるため処理時のタイムゾーンを設定するか、もしくは to_utc_timestamp でタイムスタンプをキャストする必要がある
全体設定
code:python
conf = SparkConf()
conf.set("spark.sql.session.timeZone", "JST")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)