クラウドエンジニアのノート

情報技術系全般,自分用メモを公開してます。

AWS Glue での Spark のパフォーマンス (実行時間) を改善したい

はじめに

最近 O'Reilly のLearning Spark 2nd Edition を読み始めました。

https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/

AWS Glue では Spark を使うことができますが、上記の書籍によると使い方によって実行速度が変わってくるようです。 網羅的に調査できてはないですが、手元で実行速度について比較してみた結果を雑多に記録していきたいと思います。

Glue の記事ではないですが、以下の Athena のパフォーマンスチューニング記事も役立つと思いますので、ご参考にどうぞ。

Amazon Athena のパフォーマンスチューニング Tips トップ 10 | Amazon Web Services ブログ

準備

検証に必要なデータを揃えます。

データ

手元にあった適当な ELB のアクセスログを使います。 データは全部で 94.5 GB です。

 

計測関数

from contextlib import contextmanager
import time

@contextmanager
def timer(name):
    t0 = time.time()
    yield
    print(f'[{name}] done in {time.time() - t0:.4f} s')

CSV vs Parquet

Parquet などの columnar フォーマットは、csv などのように行ごとではなく列ごとに整理されます。 一般に columnar フォーマットは、データの圧縮率が向上したり、必要な列のみ取得することでデータの読み取り速度が向上するメリットがあります。

Parquet

ざっくりとした説明になりますが、parquet ファイルの読み込みの際には、まずメタデータを参照し読み取り対象のブロックの位置を取得します。 そして読み取り対象のブロックにはそのブロックの最小値/最大値などの統計情報が格納されています。

例えば value > 5.0 の条件でデータが欲しかった場合、そのブロックの統計情報をもとに、読み込み対象をスキップしたりすることで高速化できます。 上記を Predicate Pushdown と言ったりします。

参考

File Format | Apache Parquet カラムナフォーマットParquetの構造とReadの最適化 - sambaiz-net GitHub - apache/parquet-format at parquet-format-2.2.0-rc1

読み取り速度比較

まず、Parquet 形式でデータの読み取り速度に速度に差があるか見てみます。ついでにパーティション分けたデータも作成します。

データ作成

(hours カラムを追加しております。)

# add hour column
from pyspark.sql.functions import hour
df = df.withColumn("hours", hour("request_timestamp"))
df.coalesce(1).write.mode('append').csv('s3://.../csv-chunk-high/')
df.write.mode('append')\
    .partitionBy('hours')\
    .csv('s3://.../csv-partition-high/')
df.coalesce(1).write.mode('append').parquet('s3://..../parquet-chunk-high/')
df.write.mode('append')\
    .partitionBy('hours')\
    .parquet('s3://.../parquet-partition-high/')

読み取り

with timer('csv'):
    df = spark.read.format("csv").load("s3://.../csv-chunk-high/")
    print(df.count())
    
with timer('csv partition'): 
    df = spark.read.format("csv").load("s3://.../csv-partition-high/")
    print(df.count())

with timer('parquet'):
    df = spark.read.format("parquet").load("s3://.../parquet-chunk-high/")
    print(df.count())
    
with timer('parquet partition'): 
    df = spark.read.format("parquet").load("s3://.../parquet-partition-high/")
    print(df.count())      
324917265
[csv] done in 27.1925 s

324917265
[csv partition] done in 36.3690 s

324917265
[parquet] done in 31.8977 s

324917265
[parquet partition] done in 32.5805 s

特に条件を指定しなければ、columnar 形式である parquet と csv では読み取り速度に大きな差はないようです。

読み取って Filter 処理した際の速度比較

csv ではカラム名がない点に注意

with timer('csv'):
    df = spark.read.format("csv").load("s3://.../csv-chunk-high/")
    df = df.filter(df['_c6'] < 0.0008)
    print(df.count())
    
with timer('csv partition'): 
    df = spark.read.format("csv").load("s3://.../csv-partition-high/")
    df = df.filter(df['_c6'] < 0.0008)
    print(df.count())

with timer('parquet'):
    df = spark.read.format("parquet").load("s3://.../parquet-chunk-high/")
    df = df.filter(df['request_processing_time'] < 0.0008)
    print(df.count())
    
with timer('parquet partition'): 
    df = spark.read.format("parquet").load("s3://.../parquet-partition-high/")
    df = df.filter(df['request_processing_time'] < 0.0008)
    print(df.count())
119627151
[csv] done in 44.2805 s
119627151
[csv partition] done in 48.3934 s

119627151
[parquet] done in 32.7956 s
119627151
[parquet partition] done in 37.8519 s

parquet の方が早いですね!

データサイズ比較

snappy.parquet は csv と比べてかなりデータサイズが小さくなっています!

aws s3 ls s3://.../csv-chunk-high/  --recursive --human --sum
   Total Size:  94.5 GB

aws s3 ls s3://.../parquet-chunk-high/  --recursive --human --sum
   Total Size:  11.7 GB

csv gzip はどれくらい?

csv は無圧縮なのでそこと比較するのはフェアじゃないですね。なので csvgzip 圧縮してみます。

df.coalesce(1).write.mode('append').csv('s3://.../csv-chunk-high-compress/', compression="gzip")

のように compression 引数を指定すればよいです。

aws s3 ls s3://.../csv-chunk-high-compress/  --recursive --human --sum
   Total Size: 13.3 GiB

11.7 GB vs 13.3 GiB で csvgzip より Parquet snappy の方が圧縮されていますね。

まとめ

  • データ全体の読み取り速度は csv も parquet も変わらない
  • Filter 等を実行する場合 (Predicate Pushdown を使う場合) Parquet の方が読み取り早い
  • snappy.parquet は圧縮効率が良い

Glue DynamicFrame vs Spark DataFrame

Parquet で比較してみます。

データ読み取り速度比較

with timer('df'):
    dyf = glueContext.create_dynamic_frame.from_options(
        "s3",
        {
            "paths": [
                "s3://.../parquet-chunk-high/"
            ]
        },
        "parquet",
    )
    print(dyf.count())
    
with timer('df partition'): 
    dyf = glueContext.create_dynamic_frame.from_options(
        "s3",
        {
            "paths": [
                "s3:/.../parquet-partition-high/"
            ]
        },
        "parquet",
    )
    print(dyf.count())
324917265
[df] done in 125.9965 s
324917265
[df partition] done in 55.9798 s

DynamicFrame 遅い、、、

まとめ

  • 前のセクション (読み取り速度比較) より spark.read が 27.1 s と 36.3 s なので、DynamicFrame は随分遅い
  • 興味深いことに、パーティションに分けたデータの読み取りが早いのが気になる

パーティション数の違いによる速度比較

わかりやすく極端な例として、partition 1,指定なし (デフォルト)、300 で比較してみます。 また、シャッフルの発生有無でどの程度変化するか確認します。

準備

df = spark.read.format("parquet").load("s3:/.../parquet-chunk-high/")
one_part_df = df.coalesce(1)
print(one_part_df.rdd.getNumPartitions())
one_part_df.count()

part_df = spark.read.format("parquet").load("s3:/.../parquet-chunk-high/")
print(part_df.rdd.getNumPartitions())
part_df.count()

df = spark.read.format("parquet").load("s3:/.../parquet-chunk-high/")
part_300_df = df.repartition(300)
print(part_300_df.rdd.getNumPartitions())
part_300_df.count()
1
94
300

デフォルトでは 94 パーティション読まれたみたいです。

シャッフルが発生しない処理

with timer('one part filter'):
    result = one_part_df.filter(one_part_df['request_processing_time'] < 0.0008).count()
    print(result)
    
with timer('part filter'):
    result = part_df.filter(part_df['request_processing_time'] < 0.0008).count()
    print(result)

with timer('part 300 filter'):
    result = part_300_df.filter(part_300_df['request_processing_time'] < 0.0008).count()
    print(result)
9
[one part filter] done in 45.5252 s
9
[part filter] done in 1.4579 s
9
[part 300 filter] done in 3.5410 s

94 partition が一番はやい

シャッフルが発生する処理

with timer('one part shuffle'):
    result = one_part_df.groupBy('elb_name').agg(F.mean('request_processing_time').alias("mean_time")).orderBy('mean_time').count()
    print(result)
    
with timer('part shuffle'):
    result = part_df.groupBy('elb_name').agg(F.mean('request_processing_time').alias("mean_time")).orderBy('mean_time').count()
    print(result)

with timer('part 300 shuffle'):
    result = part_300_df.groupBy('elb_name').agg(F.mean('request_processing_time').alias("mean_time")).orderBy('mean_time').count()
    print(result)
    
9
[one part shuffle] done in 78.1068 s
9
[part shuffle] done in 2.6624 s
9
[part 300 shuffle] done in 12.2829 s

94 partition が一番はやい

まとめ

Spark Join

Spark には Shuffle 処理が発生しない BroadCast Join という Join の種類があります。 Join 対象の片方のテーブルが小さく、片方が大きいテーブルがある場合に有効です。

ざっくりとですが、仕組みとしては、片方の小さいテーブルをすべての worker node へ分配し、各 node で join するといった方法です。

今回は小さい df (大きくても全体が 各 worker のメモリに乗るサイズ) と大きい df を使い、 BroadCast Join で高速化されるかどうか検証します。

BroadCast Join

join_df = part_df.select(part_df['request_port']).distinct().withColumn("random", F.round(F.rand()*(10-5)+5,0))

with timer('broadcast join dataframe'):
    broadcast_df = part_df.join(join_df.hint('BROADCAST'), part_df.request_port == join_df.request_port, how='left')
    broadcast_df.count()

with timer('sortmerge join dataframe'):
    merge_df = part_df.join(join_df.hint('MERGE'), part_df.request_port == join_df.request_port, how='left')
    merge_df.count()

with timer('shuffle hash join dataframe'):
    shuffle_df = part_df.join(join_df.hint('SHUFFLE_HASH'), part_df.request_port == join_df.request_port, how='left')
    shuffle_df.count()

JOIN の Hint はこのあたり。

Hints - Spark 3.4.1 Documentation

join_df は distinct() により数が減っています。

ここには乗せていませんが、ちゃんと explain() の Physical Plan を見て、Hint が有効なことを確認しています。

[shuffle hash join dataframe] done in 23.2022 s
[broadcast join dataframe] done in 11.7729 s
[sortmerge join dataframe] done in 38.4018 s

broadcast join が一番はやい

まとめ

  • 小さい df と大きい df の JOIN は broadcast join が一番はやい

キャッシュを使う

Spark RDD は、それに対する action が実行されるたびに計算し直されます。 上記を回避するために cache()persist() を使うことにより、RDD をメモリに残しておけます。

キャッシュありなし比較

注意点として、 cache()persist() は action ではなく transformation なので遅延評価されます。

Best practice for cache(), count(), and take() - Databricks

時間がかかりそうな distinct をやってみます。

with timer('before cache'):
    part_df.select("backend_port").distinct().count()

part_df.cache()
part_df.count() # execute cache (cache is a transformation)
with timer('after cache'):
    part_df.select("backend_port").distinct().count()
[before cache] done in 4.5241 s
[after cache] done in 1.6293 s

cache() で早くなっている

遅延評価?

知っている方は読み飛ばしてもらって結構です。

Sparkには遅延評価(Lazy Evaluation)という特性があります。 これは、action(例えば、count() などの結果を戻す操作)が呼び出されるまで、transformation(filter() のような RDD を別の RDD に変換する操作)は実行されないという特性です。

cache() や persist() は transformation であり、これらが呼び出された時点で実行されないため、count() のような action を上記ソースでは実行していました。

まとめ

  • RDD は action に対して再計算が実施されるため、キャッシュすると早くなる
  • cache(), persist() は transformation なので action の前に実行する必要あり