pysparkの使い方に慣れるためにirisデータセットをいじってみる
はじめに
pysparkを触る機会があったので,irisデータセットで色々試してみました. 適当にメモ程度なのであしからず.
環境構築
sparkの環境をローカルに構築するのが大変そうだったので,以下のDockerコンテナをつかいました.
https://hub.docker.com/r/jupyter/pyspark-notebook
サンプル集
読み込み
from pyspark import * from pyspark.sql import * from pyspark.sql.types import * import pyspark.sql.functions as F conf = SparkConf() sc = SparkContext.getOrCreate(conf=conf) sqlContext = SQLContext(sc) df = sqlContext.read.format('com.databricks.spark.csv') \ .options(header='true', inferschema='true') \ .load('/home/jovyan/work/iris.csv') df.show()
+-----------+----------+-----------+----------+-------+ |sepalLength|sepalWidth|petalLength|petalWidth|variety| +-----------+----------+-----------+----------+-------+ | 5.1| 3.5| 1.4| 0.2| Setosa| | 4.9| 3.0| 1.4| 0.2| Setosa| | 4.7| 3.2| 1.3| 0.2| Setosa| | 4.6| 3.1| 1.5| 0.2| Setosa| | 5.0| 3.6| 1.4| 0.2| Setosa| | 5.4| 3.9| 1.7| 0.4| Setosa| | 4.6| 3.4| 1.4| 0.3| Setosa| | 5.0| 3.4| 1.5| 0.2| Setosa| | 4.4| 2.9| 1.4| 0.2| Setosa| | 4.9| 3.1| 1.5| 0.1| Setosa| | 5.4| 3.7| 1.5| 0.2| Setosa| | 4.8| 3.4| 1.6| 0.2| Setosa| | 4.8| 3.0| 1.4| 0.1| Setosa| | 4.3| 3.0| 1.1| 0.1| Setosa| | 5.8| 4.0| 1.2| 0.2| Setosa| | 5.7| 4.4| 1.5| 0.4| Setosa| | 5.4| 3.9| 1.3| 0.4| Setosa| | 5.1| 3.5| 1.4| 0.3| Setosa| | 5.7| 3.8| 1.7| 0.3| Setosa| | 5.1| 3.8| 1.5| 0.3| Setosa| +-----------+----------+-----------+----------+-------+
カラム確認
df.columns
['sepalLength', 'sepalWidth', 'petalLength', 'petalWidth', 'variety']
統計量
df.describe().show()
+-------+------------------+-------------------+------------------+------------------+---------+ |summary| sepalLength| sepalWidth| petalLength| petalWidth| variety| +-------+------------------+-------------------+------------------+------------------+---------+ | count| 150| 150| 150| 150| 150| | mean| 5.843333333333335| 3.057333333333334|3.7580000000000027| 1.199333333333334| null| | stddev|0.8280661279778637|0.43586628493669793|1.7652982332594662|0.7622376689603467| null| | min| 4.3| 2.0| 1.0| 0.1| Setosa| | max| 7.9| 4.4| 6.9| 2.5|Virginica| +-------+------------------+-------------------+------------------+------------------+---------+
スライシング
df[['sepalLength', 'sepalWidth']]
DataFrame[sepalLength: double, sepalWidth: double]
ランダムサンプリング
df.sample(False, fraction=0.1).show()
+-----------+----------+-----------+----------+----------+ |sepalLength|sepalWidth|petalLength|petalWidth| variety| +-----------+----------+-----------+----------+----------+ | 4.9| 3.0| 1.4| 0.2| Setosa| | 5.7| 4.4| 1.5| 0.4| Setosa| | 5.2| 3.4| 1.4| 0.2| Setosa| | 4.7| 3.2| 1.6| 0.2| Setosa| | 6.3| 3.3| 4.7| 1.6|Versicolor| | 6.0| 2.2| 4.0| 1.0|Versicolor| | 6.1| 2.9| 4.7| 1.4|Versicolor| | 5.6| 2.5| 3.9| 1.1|Versicolor| | 5.5| 2.4| 3.8| 1.1|Versicolor| | 6.0| 2.7| 5.1| 1.6|Versicolor| | 6.7| 2.5| 5.8| 1.8| Virginica| | 6.4| 2.7| 5.3| 1.9| Virginica| | 7.4| 2.8| 6.1| 1.9| Virginica| | 7.9| 3.8| 6.4| 2.0| Virginica| | 6.7| 3.3| 5.7| 2.5| Virginica| +-----------+----------+-----------+----------+----------+
列の追加
df = df.withColumn('PetalMult', df['petalWidth'] * df['petalLength']) df.show(5)
+-----------+----------+-----------+----------+-------+-------------------+----+------------------+ |sepalLength|sepalWidth|petalLength|petalWidth|variety| PetalMult| ID| totalWidth| +-----------+----------+-----------+----------+-------+-------------------+----+------------------+ | 5.1| 3.5| 1.4| 0.2| Setosa|0.27999999999999997|40.0|0.7000000000000001| | 4.9| 3.0| 1.4| 0.2| Setosa|0.27999999999999997|45.0|0.6000000000000001| | 4.7| 3.2| 1.3| 0.2| Setosa| 0.26| 9.0|0.6400000000000001| | 4.6| 3.1| 1.5| 0.2| Setosa|0.30000000000000004|79.0|0.6200000000000001| | 5.0| 3.6| 1.4| 0.2| Setosa|0.27999999999999997|27.0|0.7200000000000001| +-----------+----------+-----------+----------+-------+-------------------+----+------------------+
ユーザ定義関数
データ更新毎に呼ばれるので注意!!
my_udf = F.UserDefinedFunction(lambda x: x + 5, DoubleType()) df.withColumn("my_col", my_udf("sepalLength")).show(5)
+-----------+----------+-----------+----------+-------+-------------------+----+------------------+------+ |sepalLength|sepalWidth|petalLength|petalWidth|variety| PetalMult| ID| totalWidth|my_col| +-----------+----------+-----------+----------+-------+-------------------+----+------------------+------+ | 5.1| 3.5| 1.4| 0.2| Setosa|0.27999999999999997|40.0|0.7000000000000001| 10.1| | 4.9| 3.0| 1.4| 0.2| Setosa|0.27999999999999997|45.0|0.6000000000000001| 9.9| | 4.7| 3.2| 1.3| 0.2| Setosa| 0.26| 9.0|0.6400000000000001| 9.7| | 4.6| 3.1| 1.5| 0.2| Setosa|0.30000000000000004|79.0|0.6200000000000001| 9.6| | 5.0| 3.6| 1.4| 0.2| Setosa|0.27999999999999997|27.0|0.7200000000000001| 10.0| +-----------+----------+-----------+----------+-------+-------------------+----+------------------+------+
グルーピング
- 列の値でグループ分けし、一列の合計を取得する場合:
df.groupBy('variety').sum().show()
+----------+------------------+------------------+------------------+------------------+------------------+ | variety| sum(sepalLength)| sum(sepalWidth)| sum(petalLength)| sum(petalWidth)| sum(PetalMult)| +----------+------------------+------------------+------------------+------------------+------------------+ | Virginica| 329.3999999999999| 148.7|277.59999999999997|101.29999999999998| 564.8099999999997| | Setosa|250.29999999999998|171.40000000000003| 73.10000000000001|12.299999999999995|18.280000000000012| |Versicolor| 296.8|138.50000000000003|212.99999999999997| 66.3| 286.02| +----------+------------------+------------------+------------------+------------------+------------------+
- 列の値でグループ分けし、一列をカウントする場合:
df.groupBy('variety').count().show()
+----------+-----+ | variety|count| +----------+-----+ | Virginica| 50| | Setosa| 50| |Versicolor| 50| +----------+-----+
groupBy→aggで集計
- filterは行の抽出
- selectは列の抽出(スライシングで代用可能)
df.groupBy('variety').agg({'petalWidth': 'min', 'sepalWidth': 'min'}).filter('min(sepalWidth) > 2.0').show()
df.groupBy('variety').agg({'petalWidth': 'min', 'sepalWidth': 'min'}).filter('min(sepalWidth) > 2.0').show() +---------+---------------+---------------+ | variety|min(petalWidth)|min(sepalWidth)| +---------+---------------+---------------+ |Virginica| 1.4| 2.2| | Setosa| 0.1| 2.3| +---------+---------------+---------------+
df.groupBy('variety').agg(F.min('petalWidth')).show()
+----------+---------------+ | variety|min(petalWidth)| +----------+---------------+ | Virginica| 1.4| | Setosa| 0.1| |Versicolor| 1.0| +----------+---------------+
groupBy→pivotで縦横変換
# groupBy("縦のままの列").pivot("縦から横へ変換したい列").sum("集計値の列") df.groupBy('ID').pivot('variety').sum('totalWidth').show()
+--------------------+------+------------------+------------------+ | ID|Setosa| Versicolor| Virginica| +--------------------+------+------------------+------------------+ |4c86ce73cf93883ac...| null|3.6399999999999997| null| |1d0123419d79ed9f4...| null| null| 5.6| |9ee6f2276bb903d07...| 0.68| null| null| |feb96d4b9ba61a234...| 0.66| null| null| |9d1bf310e18c3906a...| null| null| 6.16| |083c4bf137c021384...| null| null| 7.359999999999999| |e8249fb5d09a3670e...| null| 3.75| null| |d2a5cfc97fc88bd4a...| null| null|6.6000000000000005| |cff32af7114873f26...| null| 3.12| null| |78c2373ceaeedd5f7...| null| null| 5.319999999999999| |c0299202f49017bf2...| 1.02| null| null| |cf2b784ce8118783c...| null| null| 6.510000000000001| |1b4ac9d118b902964...| null| null| 5.13| |1aeacde745c81b265...| null| 4.5| null| |b6f96e27904f4f6de...| null| null| 6.0| |abed0bc37e718a5e4...| null| 3.12| null| |b89932d736fa67711...| null| 5.28| null| |7d132d0ead06d6ba5...| null| 3.77| null| |cfb737bf348cd3b2c...| null|3.9199999999999995| null| |42209ddc3b698bb94...| 0.68| null| null| +--------------------+------+------------------+------------------+
一意の識別子を付ける
df = df.withColumn("ID", F.monotonically_increasing_id()) df = df.withColumn('totalWidth', df['sepalWidth'] * df['petalWidth']) df.show(5)
+-----------+----------+-----------+----------+-------+-------------------+---+------------------+ |sepalLength|sepalWidth|petalLength|petalWidth|variety| PetalMult| ID| totalWidth| +-----------+----------+-----------+----------+-------+-------------------+---+------------------+ | 5.1| 3.5| 1.4| 0.2| Setosa|0.27999999999999997| 0|0.7000000000000001| | 4.9| 3.0| 1.4| 0.2| Setosa|0.27999999999999997| 1|0.6000000000000001| | 4.7| 3.2| 1.3| 0.2| Setosa| 0.26| 2|0.6400000000000001| | 4.6| 3.1| 1.5| 0.2| Setosa|0.30000000000000004| 3|0.6200000000000001| | 5.0| 3.6| 1.4| 0.2| Setosa|0.27999999999999997| 4|0.7200000000000001| +-----------+----------+-----------+----------+-------+-------------------+---+------------------+
DFのJOIN
df1 = df[['ID', 'totalWidth']].sort('ID') print(df1.show(3), df1.count())
+---+------------------+ | ID| totalWidth| +---+------------------+ | 0|0.7000000000000001| | 1|0.6000000000000001| | 2|0.6400000000000001| +---+------------------+
df2 = df[['ID', 'variety', 'PetalMult']].sort('ID') print(df2.show(3), df2.count())
+---+-------+-------------------+ | ID|variety| PetalMult| +---+-------+-------------------+ | 0| Setosa|0.27999999999999997| | 1| Setosa|0.27999999999999997| | 2| Setosa| 0.26| +---+-------+-------------------+
- 元のDataframe(こちらがLeftになる)でjoin methodを呼び、joinの相手(Rightになる)とjoinの条件を書くと、SQLのjoinの様にDataframeの結合が可能
df1.join(df2, df1['ID'] == df2['ID'], 'inner').show()
+---+-------------------+---+-------+-------------------+ | ID| totalWidth| ID|variety| PetalMult| +---+-------------------+---+-------+-------------------+ | 0| 0.7000000000000001| 0| Setosa|0.27999999999999997| | 1| 0.6000000000000001| 1| Setosa|0.27999999999999997| | 2| 0.6400000000000001| 2| Setosa| 0.26| | 3| 0.6200000000000001| 3| Setosa|0.30000000000000004| | 4| 0.7200000000000001| 4| Setosa|0.27999999999999997| | 5| 1.56| 5| Setosa| 0.68| | 6| 1.02| 6| Setosa| 0.42| | 7| 0.68| 7| Setosa|0.30000000000000004| | 8| 0.58| 8| Setosa|0.27999999999999997| | 9|0.31000000000000005| 9| Setosa|0.15000000000000002| | 10| 0.7400000000000001| 10| Setosa|0.30000000000000004| | 11| 0.68| 11| Setosa|0.32000000000000006| | 12|0.30000000000000004| 12| Setosa|0.13999999999999999| | 13|0.30000000000000004| 13| Setosa|0.11000000000000001| | 14| 0.8| 14| Setosa| 0.24| | 15| 1.7600000000000002| 15| Setosa| 0.6000000000000001| | 16| 1.56| 16| Setosa| 0.52| | 17| 1.05| 17| Setosa| 0.42| | 18| 1.14| 18| Setosa| 0.51| | 19| 1.14| 19| Setosa|0.44999999999999996| +---+-------------------+---+-------+-------------------+
列を取り出す
df.select
じゃないと動作しない- .rddとは
Dataframeの各行がそれぞれRow OjbectなRDDに変換されます。Row ObjectはSpark SQLで一行分のデータを保持する為のObjectです
sorted(df.select('ID').distinct().rdd.map(lambda x: x[0]).collect())[:10]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
df.filter(df['ID'] == 2).show()
+-----------+----------+-----------+----------+-------+---------+---+------------------+ |sepalLength|sepalWidth|petalLength|petalWidth|variety|PetalMult| ID| totalWidth| +-----------+----------+-----------+----------+-------+---------+---+------------------+ | 4.7| 3.2| 1.3| 0.2| Setosa| 0.26| 2|0.6400000000000001| +-----------+----------+-----------+----------+-------+---------+---+------------------+
参考文献
- サンプル集(group by詳しい)
- 関数まとめ