JX通信社シニア・エンジニア兼データ基盤担当大臣の@shinyorke(しんよーく)です.
最近やった「ちょっとした贅沢」は「休日, 自宅で🍺片手に野球を見ながらUberEatsで注文したランチを楽しむ」です. ⚾と飲食を提供してくださる皆さまに心から感謝しております🙏
JX通信社では,
- 機械学習を用いたプロダクト開発・施策
- プロダクト・サービスの改善に関する分析
- 日々のイベントをメトリクス化して可視化(いわゆるBI的なもの)
を円滑かつ効率よく行うため, 昨年からデータ基盤を整備・運用しており, 現在では社員のみならず(スーパー優秀な)インターンの皆さまと一緒に活用し, 成果を出し始めています.
なぜデータ基盤が必要か?どういった事をしているのか?...は上記のインタビューに譲るとして, このエントリーでは「データ基盤を支える技術 - ETL編」と称しまして,
- Python製ETLをどうやって選ぶか?
- JX通信社におけるETLフレームワークの使い分け
- 小さく作って運用するETLならprefect最高やで!
というお話をさせてもらえればと思います.*1
TL;DR
- ETLフレームワークはどれも癖があって得意不得意も違うので, 一つに絞らず適材適所にやってこ.
- JX通信社では, 大きめのWorkflowを運用するのにAirflow(Cloud Composer), 1〜複数個のコンテナで収まるバッチをprefectでやってます.
- 大きめWorkflow/小さいETL両方行ける(と思われる)prefectに期待する未来はあって良さそう
おしながき
対象読者
そこそこハイコンテクストな話題なので一応書きます.
- 規模の大小を問わず, 企業および何かしらの団体でデータ基盤構築・運用に携わっている方.
- AirflowやLuigi, kubeflowといったETLフレームワークを使った構築・運用をしたことがある方. SRE的なレイヤーが得意ならなお望ましい.
- ETLとかWorkflowとかDAGとかそのへんの単語を解説しなくても読める方.
- 「PythonicなETL話」なので, DigDag等, 別言語で実装・定義されたETLとの比較はしておりません.
Python製ツールの話メインですが言うほどPython出てこないです&統計・機械学習のアルゴリズム等は出てきませんのでそっち方面の知識は不要です.
PythonicなETLフレームワークの選び方
私の話で恐縮ですが, 過去のお仕事および個人プロジェクトでAirflowおよびLuigiを活用してきました.
上記以外にもJX通信社の仕事をはじめ結構色々とETLを構築・運用しましたが, ETLフレームワークの選び方として以下の3つが大事なのでは?と気が付きました.
- Workflow(複数のタスク処理)実行に必要なプロセス(コンテナ)の数)
- CPU・メモリの利用量が変動するか
- 開発のしやすさ
構築するETLの要件・規模(と開発者の好みなどなど)を上記の3つの視点で組み合わせるといい感じになります!というのがこの先のお話になります.
Workflow(複数のタスク処理)実行に必要なプロセス(コンテナ)の数
結論から言うと,
Dockerのコンテナ1個で処理が終わるか, それとも複数のコンテナ・プロセスで処理するかで採用するフレームワーク変わるやで!
ということです.
データ基盤のバックエンドのタスクは,
- 欲しいデータを取ってくる「Extract」タスク
- 前処理・クレンジング・集計etc...何かしらの変換を行う「Transform」タスク
- DWH, ストレージなど必要としている所にデータを読み込む「Load」タスク
この3つの組み合わせで構成され*4, 大抵の場合「Extract -> Transform -> Load」の順で「Workflow」バッチを組むことになります.
これらの設計を進めると,
- 一つのDBテーブル, 一つのアプリケーションログを相手にする比較的小さいETL Workflow
- 複数のデータセット, 複雑な集計処理を伴うかつ, 別のWorkflowと依存する大きめのETL Workflow
に別れます. そして大抵の場合,
- 「小さいETL Workflow」は一つのアプリケーションで完結することが多い. 例えば「RDBにselectした結果をS3に保存」みたいな小さなタスクはEmbulkのプロセス一つ立てればそれで終わる.
- 「大きめのETL Workflow」は「AのWorkflowが終わったらBのWorkflowを」的な複雑な順序および障害発生時の冪等性を担保するような大掛かりな仕組みが必要となり, 仕組み的にもWorkflowが担うSLA的にもゴツい仕組みが必要.
となります.
小さいものはDockerコンテナ1個で収まるとかが多いですが, 大きいものはk8sクラスタ一個まるごと使う的な事が多く, ETLフレームワークもそれらによって得意不得意が異なるのでここのポイントは結構大きいんじゃないかなと思っています.
CPU・メモリの利用量が変動するか否か
Workflowが使うComputing Resources, 特にCPUとメモリの利用量が変動するか否かも大切なポイントとなります(機械学習系だとここにGPUも加わることになります).
これも一言でいうと,
処理に必要なCPU・メモリが一定量ならDocker Containerもしくは関数系のサービスでやる, 変動するならフルマネージドなクラウドサービス使うとかしよう!
です.
「1日一回, マスターデータをS3にダンプする」「アプリケーションログを1時間に一度BigQueryに流す」みたいなタスクはCPU・メモリの利用量が安定することが多いので, コンテナで動かすアプリケーションで十分対応可能です.
が, 「複数のデータセットをかき集めてクラスタリングした後機械学習の予測とかやります」だと,
- データセットをかき集める, クラスタリングするときに多くのメモリが必要
- 機械学習の予測タスクってCPU頼り(もしくはここだけGPU使う)なのでは?
- その他の小さいタスクは分散するなり遅延処理するなりしたらさほどComputing Resourcesいらなさそう
とかあったりします. Resourcesに変動がある場合は,
- Airflowの「ECSOperator*5」「KubernetesPodOperator*6」など, 実行時にリソース指定ができる仕組みを使う
- kubeflowなど, k8s前提の流動的・弾力的にリソースが使える仕組みを使う
- Sparkとかの仕組みに乗れるものはAWS Glue/EMRやGCP Dataprocなどフルマネージドな分散処理サービスに乗っかる
などの工夫で切り抜ける必要があります(かつ, LuigiやEmbulkなどはこの手の仕組みがありません).
開発のしやすさ(含む環境構築)
開発のしやすさもかなり大事です.
まず, ETLフレームワークはどれも癖があります&人それぞれ好みも別れます.*7
迷ったらやりやすいやつを選ぶのも大事な視点です.
比較表にしてみると...
というわけで, 上記の視点で比較表を作ってみました.
なお, あくまでも個人の感想です.
Framework | 実行プロセス数 | CPU・メモリ変動 | 開発しやすさ(shinyorke個人の感想) | 備考 |
---|---|---|---|---|
Airflow | 複数コンテナ必要・Cloud Composerなどを使ってk8sで運用が推奨 | k8sクラスタ内で割り当てたり他のサービス使ったりと優秀 | DAGはPython書ければ誰でも書ける, 他は怪しい. | オンプレ・セルフホスティングは地獄ですのでオススメしません, Cloud ComposerもしくはどうしてもせるふでやるならhelmのAirflowテンプレを使うのがベスト.*8 |
Luigi | Docker Container一つで収まる | 基本的に起動したコンテナ・マシンのリソースのみ | Python完全に理解した...レベルならかろうじて行けそう | 軽量で良い仕組みですが少し癖がある&後述のprefectが良い後継なので今からやるならprefectがオススメ? |
prefect | Docker Container一つでもk8sなどでもどっちでも | 環境によって柔軟にできる(らしい) | メチャクチャやりやすい, Luigiの代替として優秀 | Airflowの代替として使えるっぽいが試した事無いです |
kubeflow | k8s | k8s内でいい感じにやれる | Pythonで書きやすそう(まだ書いたこと無い) | 機械学習Workflowとしてすごく使えそう. データを移送する程度のETLとしては少々割高かも. |
個人の感想が思いっきり入ってるので参考程度にしてもらえるとありがたいですが,
- 比較的SLAが要求されるWorkflowはAirflow
- 小さめのWorkflowはprefectやLuigi
- 機械学習目的でkubeflow
ぐらいに雑に考えると良さそうです.
JX通信社におけるETLの使い分け
JX通信社ではデータ基盤チーム・CTO室で試行錯誤や本番運用しながらのトライアルの結果,
- 全体的な大きめWorkflowはCloud Composer(Airflow)で管理
- 1コンテナで収まるタスクはprefectとLuigiを使う, Luigiは順次prefectに書き換え中
- DBのimport等小さいタスクはEmbulk
- ML Opsが必要なWorkflowはkubeflowの導入を前提に色々とお試し中
という感じで進めています.
全体像
現状の全体像はこちらです.
アプリケーションログを扱うか, RDBMS/KVSを扱うかで仕組みがちょっと変わっていますが原則として,
- すべてAirflowのDAGから実行
- 個別のタスク(小さいWorkflow)はAWS ECSもしくはCloud ComposerのPodとしてクライアントアプリ(Python製のコードもしくはEmbulk)を実行
- 最後はBigQueryに投入, 中間データはS3/GCSに残す
という運用を行っています.
Airflow(Cloud Composer)
前述のとおり, Cloud Composerにすべてを任せています.
アプリケーション(AWS)側のタスクもAirflowのトリガーで実行(すべてECS Fargateのタスクとして実行)できるのでものすごく楽にできます.
GCPのタスクもCloud Composer内のnode pool上でPodを立てて実行しています.
また, DAGについてはビジネスロジックを原則書かない運用としているため, コードベースも比較的スッキリしています.
ちなみに, すべてのAirflowタスクがCloud Composerという訳ではなく, 一部セルフホスティングしているものもありますがこれは近々Cloud Composerに引っ越しする予定で進めています.
Luigi
アプリケーションログなど, ある程度のフィルター・クレンジングが必要(≒方言がある)モノはLuigi/prefect製のアプリケーションをECS Fargateのタスクとして実行し, ログを出力する仕組みとしています.
これは「Dockerのコンテナ1個で処理が終わる」レベルなので, Luigiみたいな小さいFrameworkが良さそうということでLuigiで開発しました.
Luigiは, Dockerコンテナ一つで動かすようなWorkflowや, 小規模な機械学習pipelineに用いることが多く, (前述の事例通り)私もよく使っていたので昨年までは愛用していました(が後述の理由によりprefectに移行中です).
prefect(Luigiの後釜として)
Luigiは実績があって枯れているETL Frameworkではありますが,
- Workflowの定義・記法がやや独特. Luigiのクラス・インターフェースの定義に従って書かないといけない.
- 「前のタスクが途中か終わってるか」的な管理を時前で実装する必要がある. 具体的にはタスクの開始・終了をログファイルやDBの状態で判断するような書き方になるため, 運用(特に再実行とか)の際にハマりどころとなりやすい.
という欠点があります.
どうしたものか...と思案していたところ, 最近注目を浴びているprefectを使うとこの辺がシュッとなると気がついたので最近はprefectでの開発に切り替えています.
Airflowの開発者が, Airflowの辛い所をいい感じにするぜ!というノリで開発したETL Frameworkで, クラウドサービスも存在します.
感覚的にはAirflow的な使い方・Luigiの替わり両方行けそうな感じで, JX通信社では今の所後者(Luigiでやれそうな軽いWorkflow)で使っています.
prefectは, Luigiと異なり,
- ETL/Workflowに必要な機能がいい感じにまとまっている. 並列実行・GUIなど面倒を見てくれる
- Workflowに必要なタスク・実装をdecoratorで定義できる
- Pythonの一般的なclass/methodの実装ができたら比較的シュッとできる
利点があり活用しています.
このエントリーだけでは説明がしきれないところもあるので, prefectの詳細については近日中に別記事で紹介いたします!*9
その他(Embulk, kubeflowほか)
RDBMSやDynamoDBなどを相手にするものは処理が決まってるためEmbulkを使っています.
また, 最近はML Opsまわりをもっと進めるため試験的にkubeflowも使ったりしています.
結び - 結局どれも癖がありますので長いおつきあいを
というわけでこのエントリーでは「ETLフレームワークの選び方・組み合わせ方」について色々と書きました.
が, これが最適解かと言われるとまだまだな気がしますし, この手の事例って意外と無いんですよね...
ご指摘や改善点などありましたらコメントや意見をいただけると嬉しいです :bow:
というのと, これだけは自信を持って言えるのですが,
ETLフレームワーク, 結局どれも癖がありますので長いおつきあいを前提にやってこうぜ!
どれも癖はありますが長所を活かす使い方をするとしっかりバリュー出ます.
色々と試し, 自学自習して改善しながらいい感じにやってくのがベストかなと思います.
最後までお読みいただきありがとうございました&次はprefect編で会いましょう.
*1:なお, 余談ですがこのネタ本当はPyCon JP 2020のCfPとして提出したモノの一部となります. 悔しいことに不採択になったので供養エントリーという意味合いも含んでおります(真顔)
*2:PyCon JP 2017の発表で, 日々発生する野球の試合データを収集・可視化する基盤のバックエンドとしてAirflowを使いました.
*3:私が当時在籍していたRettyでの事例で, アライアンス関係のバックエンドをLuigiでゼロから作った話です.
*4:Transformあたりは事実上なかったりする場合も無きにしもあらず(例えばExtractがSQLでデータを取ってくるとか)ですが, 最低でもExtractとLoadの2つはある認識でいます.
*5:特定のDocker ImageをAWS ECSのタスクとして実行する仕組みでFargateも使えるためかなり便利です. 後ほど触れますがJX通信社のデータ基盤でも活用しています.
*6:指定したk8sのクラスタ・node pool上のResourcesを使って処理を行う仕組みでこれもDocker imageを元に処理することになります.
*7:PythonのWeb Frameworkを思い出してみてください. Django, Flask, sanic, FastAPIどれも良いところ・癖がそれぞれあって好みが分かれることを. 他の言語も同じでしょう.
*8:いずれもk8s前提の運用となります.
*9:ホントはこの記事のメインコンテンツのつもりでしたが思ったより文量が増えたため別けることに