AWS Glue での Spark のパフォーマンス (実行時間) を改善したい
はじめに
最近 O'Reilly のLearning Spark 2nd Edition を読み始めました。
https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/
AWS Glue では Spark を使うことができますが、上記の書籍によると使い方によって実行速度が変わってくるようです。 網羅的に調査できてはないですが、手元で実行速度について比較してみた結果を雑多に記録していきたいと思います。
Glue の記事ではないですが、以下の Athena のパフォーマンスチューニング記事も役立つと思いますので、ご参考にどうぞ。
Amazon Athena のパフォーマンスチューニング Tips トップ 10 | Amazon Web Services ブログ
準備
検証に必要なデータを揃えます。
データ
手元にあった適当な ELB のアクセスログを使います。 データは全部で 94.5 GB です。
計測関数
from contextlib import contextmanager import time @contextmanager def timer(name): t0 = time.time() yield print(f'[{name}] done in {time.time() - t0:.4f} s')
CSV vs Parquet
Parquet などの columnar フォーマットは、csv などのように行ごとではなく列ごとに整理されます。 一般に columnar フォーマットは、データの圧縮率が向上したり、必要な列のみ取得することでデータの読み取り速度が向上するメリットがあります。
Parquet
ざっくりとした説明になりますが、parquet ファイルの読み込みの際には、まずメタデータを参照し読み取り対象のブロックの位置を取得します。 そして読み取り対象のブロックにはそのブロックの最小値/最大値などの統計情報が格納されています。
例えば value > 5.0 の条件でデータが欲しかった場合、そのブロックの統計情報をもとに、読み込み対象をスキップしたりすることで高速化できます。 上記を Predicate Pushdown と言ったりします。
参考
File Format | Apache Parquet カラムナフォーマットParquetの構造とReadの最適化 - sambaiz-net GitHub - apache/parquet-format at parquet-format-2.2.0-rc1
読み取り速度比較
まず、Parquet 形式でデータの読み取り速度に速度に差があるか見てみます。ついでにパーティション分けたデータも作成します。
データ作成
(hours カラムを追加しております。)
# add hour column from pyspark.sql.functions import hour df = df.withColumn("hours", hour("request_timestamp"))
df.coalesce(1).write.mode('append').csv('s3://.../csv-chunk-high/')
df.write.mode('append')\ .partitionBy('hours')\ .csv('s3://.../csv-partition-high/')
df.coalesce(1).write.mode('append').parquet('s3://..../parquet-chunk-high/')
df.write.mode('append')\ .partitionBy('hours')\ .parquet('s3://.../parquet-partition-high/')
読み取り
with timer('csv'): df = spark.read.format("csv").load("s3://.../csv-chunk-high/") print(df.count()) with timer('csv partition'): df = spark.read.format("csv").load("s3://.../csv-partition-high/") print(df.count()) with timer('parquet'): df = spark.read.format("parquet").load("s3://.../parquet-chunk-high/") print(df.count()) with timer('parquet partition'): df = spark.read.format("parquet").load("s3://.../parquet-partition-high/") print(df.count())
324917265 [csv] done in 27.1925 s 324917265 [csv partition] done in 36.3690 s 324917265 [parquet] done in 31.8977 s 324917265 [parquet partition] done in 32.5805 s
特に条件を指定しなければ、columnar 形式である parquet と csv では読み取り速度に大きな差はないようです。
読み取って Filter 処理した際の速度比較
with timer('csv'): df = spark.read.format("csv").load("s3://.../csv-chunk-high/") df = df.filter(df['_c6'] < 0.0008) print(df.count()) with timer('csv partition'): df = spark.read.format("csv").load("s3://.../csv-partition-high/") df = df.filter(df['_c6'] < 0.0008) print(df.count()) with timer('parquet'): df = spark.read.format("parquet").load("s3://.../parquet-chunk-high/") df = df.filter(df['request_processing_time'] < 0.0008) print(df.count()) with timer('parquet partition'): df = spark.read.format("parquet").load("s3://.../parquet-partition-high/") df = df.filter(df['request_processing_time'] < 0.0008) print(df.count())
119627151 [csv] done in 44.2805 s 119627151 [csv partition] done in 48.3934 s 119627151 [parquet] done in 32.7956 s 119627151 [parquet partition] done in 37.8519 s
parquet の方が早いですね!
データサイズ比較
snappy.parquet は csv と比べてかなりデータサイズが小さくなっています!
aws s3 ls s3://.../csv-chunk-high/ --recursive --human --sum Total Size: 94.5 GB aws s3 ls s3://.../parquet-chunk-high/ --recursive --human --sum Total Size: 11.7 GB
csv gzip はどれくらい?
csv は無圧縮なのでそこと比較するのはフェアじゃないですね。なので csv で gzip 圧縮してみます。
df.coalesce(1).write.mode('append').csv('s3://.../csv-chunk-high-compress/', compression="gzip")
のように compression 引数を指定すればよいです。
aws s3 ls s3://.../csv-chunk-high-compress/ --recursive --human --sum Total Size: 13.3 GiB
11.7 GB vs 13.3 GiB で csv の gzip より Parquet snappy の方が圧縮されていますね。
まとめ
- データ全体の読み取り速度は csv も parquet も変わらない
- Filter 等を実行する場合 (Predicate Pushdown を使う場合) Parquet の方が読み取り早い
- snappy.parquet は圧縮効率が良い
Glue DynamicFrame vs Spark DataFrame
Parquet で比較してみます。
データ読み取り速度比較
with timer('df'): dyf = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://.../parquet-chunk-high/" ] }, "parquet", ) print(dyf.count()) with timer('df partition'): dyf = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3:/.../parquet-partition-high/" ] }, "parquet", ) print(dyf.count())
324917265 [df] done in 125.9965 s 324917265 [df partition] done in 55.9798 s
DynamicFrame 遅い、、、
まとめ
- 前のセクション (読み取り速度比較) より spark.read が 27.1 s と 36.3 s なので、DynamicFrame は随分遅い
- 興味深いことに、パーティションに分けたデータの読み取りが早いのが気になる
パーティション数の違いによる速度比較
わかりやすく極端な例として、partition 1,指定なし (デフォルト)、300 で比較してみます。 また、シャッフルの発生有無でどの程度変化するか確認します。
準備
df = spark.read.format("parquet").load("s3:/.../parquet-chunk-high/") one_part_df = df.coalesce(1) print(one_part_df.rdd.getNumPartitions()) one_part_df.count() part_df = spark.read.format("parquet").load("s3:/.../parquet-chunk-high/") print(part_df.rdd.getNumPartitions()) part_df.count() df = spark.read.format("parquet").load("s3:/.../parquet-chunk-high/") part_300_df = df.repartition(300) print(part_300_df.rdd.getNumPartitions()) part_300_df.count()
1 94 300
デフォルトでは 94 パーティション読まれたみたいです。
シャッフルが発生しない処理
with timer('one part filter'): result = one_part_df.filter(one_part_df['request_processing_time'] < 0.0008).count() print(result) with timer('part filter'): result = part_df.filter(part_df['request_processing_time'] < 0.0008).count() print(result) with timer('part 300 filter'): result = part_300_df.filter(part_300_df['request_processing_time'] < 0.0008).count() print(result)
9 [one part filter] done in 45.5252 s 9 [part filter] done in 1.4579 s 9 [part 300 filter] done in 3.5410 s
94 partition が一番はやい
シャッフルが発生する処理
with timer('one part shuffle'): result = one_part_df.groupBy('elb_name').agg(F.mean('request_processing_time').alias("mean_time")).orderBy('mean_time').count() print(result) with timer('part shuffle'): result = part_df.groupBy('elb_name').agg(F.mean('request_processing_time').alias("mean_time")).orderBy('mean_time').count() print(result) with timer('part 300 shuffle'): result = part_300_df.groupBy('elb_name').agg(F.mean('request_processing_time').alias("mean_time")).orderBy('mean_time').count() print(result)
9 [one part shuffle] done in 78.1068 s 9 [part shuffle] done in 2.6624 s 9 [part 300 shuffle] done in 12.2829 s
94 partition が一番はやい
まとめ
Spark Join
Spark には Shuffle 処理が発生しない BroadCast Join という Join の種類があります。 Join 対象の片方のテーブルが小さく、片方が大きいテーブルがある場合に有効です。
ざっくりとですが、仕組みとしては、片方の小さいテーブルをすべての worker node へ分配し、各 node で join するといった方法です。
今回は小さい df (大きくても全体が 各 worker のメモリに乗るサイズ) と大きい df を使い、 BroadCast Join で高速化されるかどうか検証します。
BroadCast Join
join_df = part_df.select(part_df['request_port']).distinct().withColumn("random", F.round(F.rand()*(10-5)+5,0)) with timer('broadcast join dataframe'): broadcast_df = part_df.join(join_df.hint('BROADCAST'), part_df.request_port == join_df.request_port, how='left') broadcast_df.count() with timer('sortmerge join dataframe'): merge_df = part_df.join(join_df.hint('MERGE'), part_df.request_port == join_df.request_port, how='left') merge_df.count() with timer('shuffle hash join dataframe'): shuffle_df = part_df.join(join_df.hint('SHUFFLE_HASH'), part_df.request_port == join_df.request_port, how='left') shuffle_df.count()
JOIN の Hint はこのあたり。
Hints - Spark 3.4.1 Documentation
join_df は distinct() により数が減っています。
ここには乗せていませんが、ちゃんと explain() の Physical Plan を見て、Hint が有効なことを確認しています。
[shuffle hash join dataframe] done in 23.2022 s [broadcast join dataframe] done in 11.7729 s [sortmerge join dataframe] done in 38.4018 s
broadcast join が一番はやい
まとめ
- 小さい df と大きい df の JOIN は broadcast join が一番はやい
キャッシュを使う
Spark RDD は、それに対する action が実行されるたびに計算し直されます。
上記を回避するために cache()
や persist()
を使うことにより、RDD をメモリに残しておけます。
キャッシュありなし比較
注意点として、 cache()
や persist()
は action ではなく transformation なので遅延評価されます。
Best practice for cache(), count(), and take() - Databricks
時間がかかりそうな distinct をやってみます。
with timer('before cache'): part_df.select("backend_port").distinct().count() part_df.cache() part_df.count() # execute cache (cache is a transformation) with timer('after cache'): part_df.select("backend_port").distinct().count()
[before cache] done in 4.5241 s [after cache] done in 1.6293 s
cache() で早くなっている
遅延評価?
知っている方は読み飛ばしてもらって結構です。
Sparkには遅延評価(Lazy Evaluation)という特性があります。 これは、action(例えば、count() などの結果を戻す操作)が呼び出されるまで、transformation(filter() のような RDD を別の RDD に変換する操作)は実行されないという特性です。
cache() や persist() は transformation であり、これらが呼び出された時点で実行されないため、count() のような action を上記ソースでは実行していました。
まとめ
- RDD は action に対して再計算が実施されるため、キャッシュすると早くなる
- cache(), persist() は transformation なので action の前に実行する必要あり