PySparkはじめました - 分散処理デビューする前にやったこと

JX通信社Advent Calendar 2019」10日目の記事です.

昨日は, @rychhrさんの「Pure WebSocketsをサポートしたAWS AppSyncでWebとiOS間のリアルタイムチャットを作ってみた(1)」でした.

改めまして, こんにちは. JX通信社でシニア・エンジニア&データ基盤エンジニアをしています, @shinyorke(しんよーく)と申します.

JX通信社では, データ駆動での意思決定および施策実施をより円滑に進めるため, データ基盤の構築・運用を進めながらトライアル的に様々なFrameworkやツールの検証を行っています.*1

このエントリーでは,

私がシュッとPySparkで分散処理をする...前に, 手元で試したときの感想とその知見

のお話を残していきたいと思います.

なお, 分散処理そのものの知見・ノウハウではなく, する前にPySparkに慣れておこう!っていう話です.

※分散処理に関する記述はSparkの説明で少しだけやります :bow:

TL;DR

  • PandasとSQLを使えればPySparkは使えそう&書いてて良い感じがする.
  • 環境構築と動くまでが鬼門なので, 自前ホスティングはやめた方が良い, ベスプラは「Cloud系サービス使う」こと(AWS Glue, GCP Cloud Dataprocなど).
  • 利用シーンを明確にした上で使ったほうが幸せ. Pandas他で済むパターンも有る.

この記事を読み終える頃にはきっとPySparkでシュッと何かを試したくなるハズです.

スタメン

やりたかったこと・やったこと

絵に書くとこういう感じです.

f:id:shinyorke:20191203204740p:plain
実践に向けてのキャッチアップでした

そもそも最初の構想としては,

  • プロダクトが出力するログを, 収集 -> クレンジング&クラスタリング -> BigQueryとかのDWHで使えるように出力するETLを作りたい
  • 収集からクラスタリングまでAWS Glue使えるのでは?
  • AWS Glue使うんだったらSpark理解しよう

という所からはじまりました(左側の絵).

AWS Glueのチュートリアルをやりつつも,

ワイ「そもそもGlue・Sparkわからん!」

となったので,

  • ひとまず手元の環境(Macbook Pro)にSparkクラスタをシュッと立てて
  • 手慣れたデータセットを使ってSparkになれてみよう
  • この構成ならSparkとPythonあればイケるのでは!?

と思いつき、エイヤッとやってみました(ってのが右側の絵です).

PySpark #とは

そもそもSparkって何?という方もいると思うので雑に説明すると,

  • 一言でいうと, 大量のデータを分散処理するためのFramework
  • 内部的にはクラスタリングされた「RDD(Resilient Distributed Dataset)」というデータセットで分散処理を実現
  • プログラマブルにデータを触ったりイジイジするためのDataFrameやSpark SQLといったインターフェースを使う*2
  • ETLな処理を実現するためのWorkflow, 機械学習を実装するためのライブラリ(MLlib)が実装されている*3

もので, 今どきの分散処理では割と有力な選択肢となっています.

Spark本体はScala(Java)で開発されており, これらの言語から使うこともできますが,

Pythonベースのラッパーである, PySparkを使って扱うのがベスト・プラクティスとなっています*4*5.

手元で試しに使ってみる

というわけで早速お試ししました.

実際は業務上のデータでお試ししたのですが, 流石にブログでお披露目できるようなものではない(察し)ので,

  • オープンデータで皆が手に入れることができて
  • ある程度きれいなフォーマットのデータで
  • すでにPythonや他の言語で分析・解析した実績がある

ようなデータセットとお題でサンプルをご用意いたしました.

そう, 個人的に一番得意な「野球データ」です.*6

shinyorke.hatenablog.com

上記のエントリー(私の個人ワークです)で作成した,

  • とある歴史上の選手の成長モデルを線形回帰でいい感じにやってみる*7
  • 線形回帰モデルは, 目的変数をOPS(On Base Plus Slugging), 説明変数を年齢になにかしたもの*8
  • 上記をPandas + scikit-learn + bokehで再現*9

を, 今度はPySparkで作り変えました(書き換えまでの所要時間:3時間34分*10).

PySpark版のコードと, Python版コードをぜひ比較しながらこの先お楽しみください.

環境を作る・動かす

検証環境自体は(Sparkの環境作りがどれほど大変かを試す意味も含めて)自分のMacbook Pro上でやりました.

brewでSparkをインストールした後,

 $ brew install apache-spark

pythonのvenvで仮想環境を設定(プロジェクト直下の.venvに作成), direnvで

source .venv/bin/activate
source .env

を設定後, プロジェクト配下の.envに

export PYSPARK_PYTHON=/Users/shinyorke/sample/.venv/bin/python
export PYSPARK_DRIVER_PYTHON=/Users/shinyorke/sample/.venv/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="'notebook' pyspark"

これで準備を行い,

 $ pysprk

コマンドで起動するようになりました.

なお, コードをcloneして自前でビルドしてやる方法もありますが,

  • ある程度JavaとMavenの経験と知識が必要*11
  • 何よりも時間がメッチャかかる*12

事もあり, 積極的にはオススメしません.*13

もしお試ししたい時は,

といった形で課金して人類の叡智に乗っかりましょう.

PySparkとPythonで比較してみた

というわけで実際に比較してみました.

同じようなことをやってるコード同士を抜粋して比較します.

データの読み込み

普通のPython版ではPandasのDataFrameを, PySpark版ではSparkのDataFrameを使いました.

なお, どちらも行列データを扱う・扱うためのメソッドがたくさんついてくる感じですが, PySparkは内部的に分散してデータ*14を保持していること, SQLライクに操れる(後ほど紹介)という違いがあります.

というわけで, CSVからDataFrameを取ってくる処理で比較してみましょう.

Pandas版(抜粋)

お馴染みすぎる処理なのであんまり説明がいらないかも笑.

import csv
import pandas as pd
pd.options.display.max_columns = 100

# CSVからDataFrame
df_batting = pd.read_csv("/Users/shinyorke/github/baseballdatabank/core/Batting.csv")

# nanはすべて0埋め
df_batting.fillna(0.0, inplace=True)

PySpark版(抜粋)

PySparkで同じことをやると

# データ
from pyspark.sql.types import *

# 後ほどDataFrameのデータ型を指定するため使う
STATS_COLUMNS = ['G', 'AB', 'R', 'H', '2B', '3B', 'HR', 'RBI', 'SB', 'CS', 'BB', 'SO', 'IBB', 'HBP', 'SH', 'SF', 'GIDP']

# 打撃成績Schemaのデータ型
schema_batting = StructType(
    [
        StructField('playerID', StringType(), False),
        StructField('yearID', IntegerType(), False),
        StructField('stint', IntegerType(), False),
        StructField('teamID', StringType(), False),
        StructField('lgID', StringType(), False),
        StructField('G', IntegerType(), True),
        StructField('AB', IntegerType(), True),
        StructField('R', IntegerType(), True),
        StructField('H', IntegerType(), True),
        StructField('2B', IntegerType(), True),
        StructField('3B', IntegerType(), True),
        StructField('HR', IntegerType(), True),
        StructField('RBI', IntegerType(), True),
        StructField('SB', IntegerType(), True),
        StructField('CS', IntegerType(), True),
        StructField('BB', IntegerType(), True),
        StructField('SO', IntegerType(), True),
        StructField('IBB', IntegerType(), True),
        StructField('HBP', IntegerType(), True),
        StructField('SH', IntegerType(), True),
        StructField('SF', IntegerType(), True),
        StructField('GIDP', IntegerType(), True),
    ]
)

# RDDをCSVから作る(引数は並列数っぽい)
rdd_batting = sc.textFile('../../../baseballdatabank/core/Batting.csv', 4)


# RDDをDataFrameに変換
batting = spark.read.option("header","true").format("csv").schema(schema_batting).csv(rdd_batting)
batting = batting.fillna(0, subset=STATS_COLUMNS)

Schema作るあたりからJavaな風味が漂ってきますね(小並感).

SparkはRDDがすべての基本なので, RDDを作った後にDataFrameを作ることになります.

Pandasの場合と異なり, 暗黙的に型を指定する動きはしないため,

  • RDD -> DataFrame変換時にフォーマット(CSV)を指定
  • RDDはスキーマを持たないデータなので, DataFrameのSchemaを別に指定

という順序を踏むことになります.

加工(前処理としてOPSを計算)

次に, 取得したデータからOPSを算出します.

OPSは「出塁率(OBP)に長打率(SLG)を足したもの」なので,

OBP = (H + BB + HBP) / (AB + BB + HBP + SF)*15

SLG = (HR * 4 + 3B * 3 + 2B * 2 + Single) / AB*16

OPS = OBP + SLG

という順でいい感じに計算します.

Pandas版(抜粋)

AB, Hなど複数のSeriesを使うので,

  • 計算用の関数を作る. 計算そのものは野球統計ライブラリ「sabr」を使う.*17
  • DataFrameに直接関数をapply

というアプローチでやりました.

# 打率, 出塁率, 長打率. 指標値計算にSABRを使う

import sabr

from sabr.stats import Stats

def obp(row: pd.Series) -> float:
    try:
        return Stats.obp(ab=row.AB, h=row.H, bb=row.BB, sf=row.SF, hbp=row.HBP)
    except ZeroDivisionError as e:
        return 0.0

def slg(row: pd.Series) -> float:
    try:
        return Stats.slg(ab=row.AB, tb=row.TB)
    except ZeroDivisionError as e:
        return 0.0

df_batting['OBP'] = df_batting.apply(obp, axis=1)
df_batting['SLG'] = df_batting.apply(slg, axis=1)
df_batting['OPS'] = df_batting['OBP'] + df_batting['SLG']

この辺はPandas使いの方はお馴染みの方法かな...*18

PySpark版

PySparkの場合はいくつかやり方があります.

  • Python(Pandas)式と同じく, DataFrame上から直接計算
  • Spark SQLで直接計算. 具体的にはselect文の中で直接四則演算する

ひとまずDataFrame式でやるとこうなります.

# udfでユーザー定義関数を定義し, カラムとしてはめていく
from pyspark.sql.functions import udf, col

# sabrは野球指標を計算するためのライブラリ
from sabr.stats import Stats

@udf(DoubleType())
def obp(ab, h, bb, sf, hbp):
    # 出塁率
    try:
        return Stats.obp(ab=ab, h=h, bb=bb, sf=sf, hbp=hbp)
    except ZeroDivisionError as e:
        return 0.0

@udf(DoubleType())
def slg(ab, h, _2b, _3b, hr):
    # 長打率
    try:
        _1b = h - (_2b + _3b + hr)
        tb = _1b * 1 + _2b * 2 + _3b * 3 + hr * 4
        return Stats.slg(ab=ab, tb=tb)
    except ZeroDivisionError as e:
        return 0.0

# DataFrameに打率, 出塁率, 長打率, OPS(出塁率 + 長打率)を追加
batting = batting.withColumn('OBP', obp('AB', 'H', 'BB', 'SF', 'HBP'))
batting = batting.withColumn('SLG', slg('AB', 'H', '2B', '3B', 'HR'))
batting = batting.withColumn('OPS', col('OBP') + col('SLG'))

やってる順序はPandas版と変わりませんが,

  • @udfデコレータで戻り値の型を指定(DoubleType()ってやつですね)
  • DataFrameのwithColumnメソッドでユーザー定義関数と使うカラムを指定

と, これも雰囲気にJava感出てきました.

なお, 参考までにSpark SQLでやる場合は,

# 出塁率と長打率, OPSを自前で計算
query = '''
    select 
    (b.H + b.BB + b.HBP) / (b.AB + b.BB + b.HBP + b.sf) as obp,
    (b.HR * 4 + b.3B * 3 + b.2B * 2 + (b.H - b.HR - b.3B - b.2B) * 1) / b.AB as slg,
    obp + slg as ops
    from batting as b join player as p on b.playerID = p.playerID 
    where p.nameFirst="{firstname}" and p.nameLast="{lastname}"
    order by b.yearID asc
'''
spark.sql(query.format(firstname='Mickey', lastname='Mantle'))

という感じに, SQLっぽくやることはできますが, プログラマブルっぽくないのであんまりオススメじゃないです.*19

学習・可視化

本当はSparkのMLlibを使ってやるつもりでした...が!

途中で挫折したため, 今回はどちらもscikit-learnでやりました :bow:

なお, SparkのDataFrameからPandasのDataFrameは一行で取得できます.

# 年齢, OPSおよび予測に必要なデータを返す関数
def batting_stats(firstname, lastname):
    query = '''
    select 
    b.yearID - p.birthYear - 1 as AGE,
    b.OPS as OPS,
    (b.yearID - p.birthYear - 1) -30 as age_30,
    pow((b.yearID - p.birthYear - 1) -30, 2) as age_30_2
    from batting as b join player as p on b.playerID = p.playerID 
    where p.nameFirst="{firstname}" and p.nameLast="{lastname}"
    order by b.yearID asc
    '''
    return spark.sql(query.format(firstname=firstname, lastname=lastname))

mantle = batting_stats("Mickey", "Mantle")

# scikit-learnで線形回帰するため、pandas Dataframeに変換
df_mantle = mantle.toPandas()

というわけで, こちらはPython版と変わらずですが, 次の機会にMLlibでやろうと思います.

なお, 可視化は結局Pandas DataFrameもしくはndarrayでやるっぽいので, Bokehなど好みのライブラリを使うと良さそうです.

【参考】Python版の可視化

というわけで, 完成したグラフはこちらです.

f:id:shinyorke:20191205225913p:plain
データ元はSparkです(他は変わらない)

結び

今回はデータを前処理するところまで, PySparkとPandasで比較を行いました.

それぞれに良いところ・しっくり行かないところがありますが, あえて言うなら,

  • 数GB以上のデータをそれなりの性能要件(例えばリアルタイムとストリームっぽくなにかやる)で処理する必要がある場合はSpark合ってるっぽい.
  • 通常の分析・解析業務, 特にアドホックに済むものはやっぱPandas強い.
  • とはいえ, PySparkはSparkの仕組みと分散処理のノウハウを把握してたらこれはこれで強い道具.

といったところかなと思います&野球はやっぱお試しのデータセットとして面白い*20.

次の機会がありましたら, タイムオーバーで試すことができなかったMLlibを続きでやりたいなと思います.

ここまでお読みいただきありがとうございました.

明日の「JX通信社Advent Calendar 2019」は, @maplerさんです.

【Appendix】参考資料

当エントリーの執筆および, Sparkのお試しでは以下の文献を参考にしました.

Pythonで大量データ処理!PySparkを用いたデータ処理と分析のきほん*21

入門PySpark

www.oreilly.co.jp

【Appendix】PySparkのサンプルコード(全体)

PySpark検証サンプル

*1:この件のみならず, スプリントの中で時間を区切って検証をしやすい環境ではありますし, これで技術選定・開発も進むので実にやりやすいです.

*2:ここでPandas風にDataFrameだったり, SQLで操れたりします. また, 元々がHadoopの後継的な立ち位置なのでRDDを使ってMap/Reduceという方法でも同じことができます.

*3:後で触れますが今回は間に合わずだったので触れていません. いつかやりたい.

*4:ただし, Sparkの機能をすべてPySparkから使えるわけではないです. 必要十分すぎる感はありますが.

*5:他にもRの実装もあるみたいです, 公式で.

*6:メジャーリーグだと複数のオープンデータセットがあり, Kaggleにも題材がある程度に揃っているかつ, データサイズが数百MB(GBいかない)ぐらいなのでこの手の検証には最適です, 本筋と関係ないですが笑.

*7:要するに時系列で分析しています, 内容知りたい方は引用したブログの方をご覧ください.

*8:OPSは簡単に言うと選手の得点能力を「量」として捉える数字で野球の世界では結構メジャーです, 成長モデルは雑に言うと「若いうちは伸びるけどある一定の年を取ったら衰えるはず」なので, 年齢が説明変数となります.

*9:ちなみに元のコードはRでした, RからPython乗り換えはここでは触れないので引用先をご覧ください.

*10:なんでや!は(ry ...はさておき, 実際のところ1日ちょいで置き換えできました.

*11:Javaで開発したことある人なら多分楽勝です, 自分もそれがあったのでイケましたが「Pythonしかやったことない」レベルだと苦痛だと思います.

*12:スペックによりますが, ビルドで2,3時間近くかかるはず.

*13:過去に個人の興味・趣味でやった際は, こちらのオライリー本の付録を元にやった&これで十分動きましたがまあ...ねえ.

*14:Sparkが分散処理Frameworkなので当然っちゃ当然かなと. 余談ですがPandasの分散処理はDaskほかいくつか存在します.

*15:H:安打数, BB:四球, HBP:死球, AB:打数, SF:犠牲フライ

*16:HR:本塁打, 3B:三塁打, 2B:二塁打, Single:単打, AB:打数. 本塁打から単打までを「塁打」として計算して打数で割ることにより算出します.

*17:ちなみに拙作です.

*18:今回ぐらいのデータ量ならapplyでやるのがベストですが, もっと大きいデータを扱う時はSeries間の変換コストがかなりかかるので, いい感じにmapで処理するなどした方が良いときもあります.

*19:とはいえ, AWS GlueやDatabricksみたいなサービスでアドホックにやるときはこれで良いかもです.

*20:真面目な話, 初めて触るもの・体験するものに備えて「馴れた」データセットを用意しておくのは色々小回り効くし何より対象物のキャッチアップに集中できるのでかなりオススメです.

*21:余談も余談ですが, 林田さんに勧められてSparkとPySparkを認知し, 使いはじめました(この発表された当時同僚でした)