Pythonでいい感じにバッチを作ってみる - prefectをはじめよう

JX通信社シニア・エンジニアで, プロダクトチームのデータ活用とデータサイエンスのあれこれ頑張ってるマン, @shinyorke(しんよーく)です.

最近ハマってるかつ毎朝の日課は「リングフィットアドベンチャー*1で汗を流してからの朝食」です. 35日連続続いています.

話は遡ること今年の7月末になりますが, JX通信社のデータ基盤の紹介&「ETLとかバッチってどのFW/ライブラリ使えばいいのさ🤔」というクエスチョンに応えるため, このようなエントリーを公開しました.

tech.jxpress.net

このエントリー, 多くの方から反響をいただき執筆してよかったです, 読んでくださった方ありがとうございます!

まだお読みでない方はこのエントリーを読み進める前に流して読んでもらえると良いかも知れません.

上記のエントリーの最後で,

次はprefect編で会いましょう.

という挨拶で締めさせてもらったのですが, このエントリーはまさにprefect編ということでお送りしたいと思います.

github.com

今回はprefectで簡単なバッチシステムを作って動かす, というテーマで実装や勘所を中心に紹介します.

prefectをはじめよう

prefect #とは

f:id:shinyorke:20201215220448j:plain

簡単に言っちゃうと, Pythonで開発されたバッチアプリのFrameworkで,

The easiest way to automate your data.

(意訳:あなたのデータを自動化していい感じにするのに最も簡単な方法やで)

がウリとなっている模様です.

公式リポジトリのREADME.mdの解説によると,

Prefect is a new workflow management system, designed for modern infrastructure and powered by the open-source Prefect Core workflow engine. Users organize Tasks into Flows, and Prefect takes care of the rest.

(意訳:prefectは今風のインフラストラクチャーに合わせて設計されたFrameworkで, 開発者はTaskとFlowを書いてくれたらあとはPrefect Coreがいい感じにワークフローとして処理するやで)

というモノになります.

ちなみにHello worldはこんな感じです.

from prefect import task, Flow, Parameter


@task(log_stdout=True)
def say_hello(name):
    print("Hello, {}!".format(name))


with Flow("My First Flow") as flow:
    name = Parameter('name')
    say_hello(name)


flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"

@taskデコレーターがついた関数(上記の場合say_helloがそう)が実際に処理を行う関数.

処理に必要な引数を取ったり関数を呼んだりするwith Flow("My First Flow") as flowの部分を開発者が実装, あとはよしなにやってくれます.

こんにちはprefect

という訳で早速prefectをはじめてみましょう.

一番ラクな覚え方・始め方は公式のリポジトリをcloneしてチュートリアルを手元で動かすことかなと思っています.

※私はそんなノリでやりました.

$ git@github.com:PrefectHQ/prefect.git
$ cd prefect
$ pip install prefect SQLAlchemy

SQLAlchemyが入っているのはひっそりとチュートリアルで依存しているからです(小声)*2.

ちなみにPython3.9でも動きました👍

ここまで行けば後はチュートリアルのコードを動かしてみましょう.

$ cd examples/tutorial
$ python 01_etl.py   

このブログを執筆した2020/12/18現在では, 06_parallel_execution.py以外, 滞りなく動きました.

ひとまずこんな感じで動かしながら, 適当に書き換えながら動かしてやるといい感じになると思います.

軽めのバッチ処理を作ってみる

exampleをやりきった時点で小さめのアプリは作れるんじゃないかなと思います.

...と言っても, 何にもサンプルが無いのもアレと思い用意しました.

github.com

baseballdatabankというメジャーリーグ⚾️のオープンデータセットを使って超簡単なETLバッチのサンプルです.*3

f:id:shinyorke:20201215205310p:plain

  • 選手のプロフィール(People.csv)を読み込み
  • 打撃成績(Batting.csv)を読み込み&打率等足りない指標を計算
  • 選手プロフィールと打撃成績をJOINしてのちcsvと出力

というETL Workflowなのですが, こちらの処理はたったこれだけのコードでいい感じにできます.

import logging
from datetime import datetime

import pandas as pd
import click
from pythonjsonlogger import jsonlogger
from prefect import task, Flow, Parameter

logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)


@task
def read_csv(path: str, filename: str) -> pd.DataFrame:
    """
    Read CSV file
    :param path: dir path
    :param filename: csv filename
    :return: dataset
    :rtype: pd.DataFrame
    """
    # ETLで言うとExtractです
    logger.debug(f'read_csv: {path}/{filename}')
    df = pd.read_csv(f"{path}/{filename}")
    return df


@task
def calc_batting_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    打率・出塁率・長打率を計算して追加
    :param df: Batting Stats
    :return: dataset
    :rtype: pd.DataFrame
    """
    # ETLで言うとTransformです
    logger.debug('calc_batting_stats')
    _df = df
    _df['BA'] = round(df['H'] / df['AB'], 3)
    _df['OBP'] = round((df['H'] + df['BB'] + df['HBP']) / (df['AB'] + df['BB'] + df['HBP'] + df['SF']), 3)
    _df['TB'] = (df['H'] - df['2B'] - df['3B'] - df['HR']) + (df['2B'] * 2) + (df['3B'] * 3) + (df['HR'] * 4)
    _df['SLG'] = round(_df['TB'] / _df['AB'], 3)
    _df['OPS'] = round(_df['OBP'] + _df['SLG'], 3)
    return df


@task
def join_stats(df_player: pd.DataFrame, df_bats: pd.DataFrame) -> pd.DataFrame:
    """
    join dataframe
    :param df_player: player datea
    :param df_bats: batting stats
    :return: merged data
    :rtype: pd.DataFrame
    """
    # ETLで言うとTransformです
    logger.debug('join_stats')
    _df = pd.merge(df_bats, df_player, on='playerID')
    return _df


@task
def to_csv(df: pd.DataFrame, run_datetime: datetime, index=False):
    """
    export csv
    :param df: dataframe
    :param run_datetime: datetime
    :param index: include dataframe index(default: False)
    """
    # ETLで言うとLoadです
    logger.debug('to_csv')
    df.to_csv(f"{run_datetime.strftime('%Y%m%d')}_stats.csv", index=index)


@click.command()
@click.option("--directory", type=str, required=True, help="Baseball Dataset Path")
@click.option("--run-date", type=click.DateTime(), required=True, help="run datetime(iso format)")
def etl(directory, run_date):
    with Flow("etl") as flow:
        run_datetime = Parameter('run_datetime')
        path = Parameter('path')
        # Extract Player Data
        df_player = read_csv(path=path, filename='People.csv')
        # Extract Batting Stats
        df_batting = read_csv(path=path, filename='Batting.csv')
        # Transform Calc Batting Stats
        df_batting = calc_batting_stats(df=df_batting)
        # Transform JOIN
        df = join_stats(df_player=df_player, df_bats=df_batting)
        # Load to Data
        to_csv(df=df, run_datetime=run_datetime)

    flow.run(run_datetime=run_date, path=directory)


if __name__ == "__main__":
    etl()

pandasの恩恵に授かって*4prefectのお作法に従うと比較的見通しの良いworkflowが書けますね, というのがわかります.

今回はcsvファイルを最終的なinput/outputにしていますが,

  • ストレージにあるjsonをいい感じに処理してBigQueryにimport
  • AthenaとBigQueryのデータをそれぞれ読み込んで変換してサービスのRDBMSに保存

みたいな事ももちろんできます(taskに当たる部分でいい感じにやれば).

この辺はデータ基盤やETL作りに慣れていない人でもPythonの読み書きができれば直感的に組めるのでかなりいいんじゃないかと思っています.

その他にできること&欠点とか

今回は「ひとまずprefectでETLっぽいバッチを作って動かす」という初歩にフォーカスしていますが, 実はこのprefect高機能でして,

  • タスクの進行状況をGUIで表示可能(AirflowとかLuigiっぽい画面)
  • 標準でDocker, k8sの他GCP, AWS, Azure等のメジャーなクラウドサービスでいい感じに動かせる

など, かなりリッチな事ができます.

一方, 使ったときのネガティブな感想としては,

  • 色々できるんだけど, 色々やるために覚えることはまあまあたくさんある.
  • 色々できるんだけど, それが故に依存しているライブラリとかが多く, 自前でホスティングするときのメンテ効率とかはちょっと考えてしまう.
  • ちゃんとデバッグしてないのでアレですが, 並列処理の機構がホントに並列で動いてるか自信がないときがある🤔

と, 心配なポイントもいくつかありました.

前のエントリーにも記載しましたが,

ETLフレームワーク, 結局どれも癖がありますので長いおつきあいを前提にやってこうぜ!

結局のところこれに尽きるかなあと思います*5.

結び - 今後のこの界隈って🤔

というわけでprefectを使ったいい感じなバッチ開発の話でした.

データ基盤や機械学習のWorkflowで使うバッチのFWやライブラリはホント群雄割拠だなあと思っていまして,

cloud.google.com

note.com

AirflowのDAGがシンプルに書けるようになったり(ほぼprefectと同じ書き方ですよね*6), BigQueryのデータをいい感じにする程度のETLならほぼSQLで終わる未来がくる(かも)だったりと, この界隈ホント動きが活発です.

このエントリーの内容もきっと半年後には古いものになってるかもですが, トレンドに乗り遅れないように今後もチャレンジと自学自習を続けたいと思います!

なおJX通信社ではそんなノリで共に自学自習しながらサーバーサイドのPythonやGoでいい感じにやっていく学生さんのインターンを募集しています.

www.wantedly.com

おそらく私が年内テックブログを書くのは最後かな...

皆様良いお年を&来年また新たなネタでお会いしましょう!

*1:執筆時点のLVは58, 運動負荷はMAXの30です. 筋肉が喜んでます💪

*2:このエントリーのため久々に試していましたがあっ(察し)となります.

*3:なぜ⚾のオープンデータ化というと, 私の趣味かつ手に入りやすい使いやすいオープンデータだったからです.

*4:この程度の処理だとprefectよりpandasの優秀さが目立つ気はしますが, デコレーターでいい感じにflowとtaskに分けられているあたりprefectの設計思想は中々筋が良いと言えそうです.

*5:ETLに限った話ではないのですが, 選んだ以上メンテをちゃんとやる, 使い切る覚悟でやるってことかなあと思っています.

*6:余談ですがprefectの作者はAirflowのコントリビューター?作者??らしいです.

AWS・GCPとKubernetesの権限まわりの用語を具体例から理解する

はじめに

TL; DR;

  • 社内の普段はインフラ以外のところを主戦場にしている人向けに、AWS・GCPの権限に関する用語と概念を説明するために書いたものを加筆訂正して公開します
  • AWS・GCPの権限管理は、基本的な概念は似ているが同じ英単語が別の意味でつかわれているのでややこしい
  • 書いてあること
    • 概念の説明と、関係を表す図
    • EKS・GKEからクラウドリソース *1 を使う時の考え方
  • 書いてないこと
    • 設定のためのコンソール画面のスクショや手順
  • Kubernetesからクラウドリソースを操作する方法は、以前のブログ「GitHub Actionsで実現する、APIキー不要でGitOps-likeなインフラCI/CD」でTerraformによるコードの例も紹介しているので、あわせて参考にしてみてください

想定読者

  • AWSはそこそこ使って慣れているけど、GCPにおける権限管理を理解したい人(またはその逆)
  • マネージドなKubernetes上でクラウドのリソースを使うときの考え方を知りたい人

なぜパッと見てややこしいのかについて、私は

  • 同じ単語が異なる意味で使われている
  • 似ている概念に異なる単語が当てられている
  • ある目的を達成する手段が別の方法で分解されている

の3つの理由があるからだと思います。なので、本記事ではできるだけ枕詞をつけたりセクションを区切って、「何について話しているのか」を明確にしながら解説していきたいと思います。

AWSとGCP

この二者の間では、まず「Policy」および「Role」という単語が異なる意味で使われています。

Policy

AWSにおけるPolicy

  • 「許可・禁止する操作(Actions)とその対象リソース(Resouces)」を表現するもの
    • (例)「xxxって名前のbucketをReadしていいよ」
    • (例)「dev- で始まる名前のDynamoDBについて何でもやって良いよ、ただしテーブル消しちゃダメよ」
  • つまり「操作対象と内容」についての話で、「誰が」その権限を持つかについては関知しない
    • 認証(Authentication)ではなく認可(Authorization)に関係している
    • 「誰」を定義するのAWSではIAM User/Group/Roleの役目

GCPにおけるPolicy

  • 「操作の主体」と「操作の内容」を紐付けるもの
    • 操作の主体
      • (例) Googleアカウントでコンソールにログインする生身のユーザ
      • (例) CloudFunctionやGCEインスタンスが使うService Account
    • 操作の内容: GCPにおけるRole
  • 許可する操作に「Condition(条件)」をつけることができる
    • (例)「君にGCSの読み書き権限あげるけど、 hoge-stg ってバケットだけね」
    • (例)「アナタはBigQueryでQuery実行していいけど、日本時間の月〜金だけね」
  • Webのコンソール上では IAM & Admin > IAM で操作できる
    • (筆者の観測範囲では)AWSから来た人はこの辺で迷うことがよくあるみたいです

Role

AWSにおけるRole

  • IAM Policyのセットをまとめて、「誰か」に使ってもらうもの
  • 生身のユーザーや、Lambda/EC2/ECSなどのワークロードに、Roleを紐付けて使う(=RoleをAssumeする)

GCPにおけるRole

  • 許可する操作をまとめた権限のセット
  • Storage Admin (project) Owner といった名前がついている
  • Roleを紐付ける対象はService AccountまたはUserであり、GCE・CloudFunctionといったワークロードではない

「権限をセットにする」という意味でAWS・GCPともに似ているようですが、実際にワークロードにクラウドリソースの権限を渡す方法が微妙に異なります。では具体例で比較してみましょう。

具体例で対比して理解する

「Serverlessな関数(Lambda/CloudFunction)からオブジェクトストレージ(S3/GCS)の特定のBucketをRead onlyに使いたい」 というケースを考えてみましょう。*2

  • AWSの場合は、「S3バケットを読むためのIAM PolicyをIAM Roleにつけて、そのIAM RoleをLambdaに使わせる」
  • GCPの場合は、「GCSバケットを読むためのRoleをServiceAccountにつけて(=Policyの設定)、CloudFunctionsにそのServiceAccountを使わせる」

図で表すとこのようになります。

f:id:TatchNicolas:20201214090717p:plain
Lambda/CloudFunctionからクラウドリソースを使う例

こうしてみると、 AWSにおけるPolicyは、GCPのPolicyよりもRoleの方に性格が近い かもしれません。整理すれば考え方として似通っているところもあり、決して複雑ではないのですが、こういった概念と用語の違いが「パッと見てややこしく見えてしまう」原因ではないでしょうか。

また、「オブジェクトストレージ(S3/GCS)系サービス触っていいけど、このバケットだけね」のように個別のリソース単位の制限をかけるのが

  • AWSの場合はIAM Policyで可能な操作(S3の読み取り)と対象(許可する個別のバケットの絞り込み)をまとめて指定する
  • GCPの場合はRoleでGCSというサービス自体の読み取り権限を定義され、それを主体と紐付ける(=Policyを定義する)ところでConditionをつけて対象のバケットを絞る

という違いがあり、(AWSからみれば)GCPでは同じことを達成するための手段が分解されているように見えます。用語の違いだけでなく、同じことを実現するための手段が違うステップに分かれていることも、最初は少し混乱してしまう理由の一つではないでしょうか。

Kubernetes上のPodからクラウドのリソースを使う場合

では、AWS・GCPのマネージドなKubernetes(=EKS/GKE)の上で動くワークロード(=Pod)から、S3/GCSなどのクラウドリソースを操作する場合を考えていきましょう。

AWS・GCPの層の上にKubernetesが乗っかり、「(Kubernetesの)ServiceAccount」という概念が出てくるので、ここまでの説明を踏まえて整理してみます。

AWSとKubernetes(EKS)

EKSからAWS上のリソースを使いたい場合は「IAM RoleとEKSクラスタ上のServiceAccountを紐付ける」が基本になります。 *3

もちろん「IAM Userを作成し、APIキーを発行して、KubernetesのSecretリソースを通してPodに持たせる」こともできますが、静的なAPIキーの発行は避けた方が良いでしょう。*4

GCPとKubernetes(GKE)

前述のEKSの場合と同様の考え方で、「GCPのServiceAccount(GSA)とGKEクラスタ上のServiceAccount(KSA)を紐付ける」が基本になります。その方法がWorkload Identityです。

GCPの世界とKubernetesの世界の両方に「Service Account」という単語が出てくることが少しややこしいので、「今どちらについて話題にしているのか」を意識しておくとよいでしょう。

Workload Identityを使うことで、静的なAPIキーを発行することなくKubernetes上のワークロード(=Pod)にGCPリソースを利用させることができます。

具体例で対比して理解する

具体例として前述のLambda/CloudFunctionsの例にならって、「EKS/GKE上のPodからS3/GCSの特定のBucketをRead onlyに使う」ケースを考えてみましょう。

f:id:TatchNicolas:20201214090957p:plain
k8s上のPodからクラウドリソースを使う例

図にしてみると、非常によく似ていることがわかりますね。

まとめ

AWS・GCPおよびKubernetesの権限まわりの用語と概念を、具体例で対比しながら整理してみました。どなたかの参考になれば幸いです。

最後に

JX通信社では、PythonやGoを使って「NewsDigest」の開発に参加してくれるインターン生を募集しています! 特に、AWS・GCPの利用経験のある方は歓迎します!

www.wantedly.com

*1:本記事ではS3・GCSといったオブジェクトストレージやRoute53・CloudDNSなどのDNSサービスなど、「利用するためにクラウドの権限が必要なもの」を指します

*2:厳密には、S3のBucket PolicyやGCSのACLなどBucket側の設定も存在しますが、今回はIAM側について話しているので言及しません

*3:https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html

*4:今は静的なAPIキーなしでKubernetesのServiceAccount単位でIAM Roleを設定できますが、かつてはそれが不可能でAPIキーを発行せずに権限を持たせるにはNode単位でEC2としてInstance Profile設定するしかありませんでした。この辺りの経緯や詳細は、こちらの解説がわかりやすいです https://dev.classmethod.jp/articles/eks-supports-iam-roles-for-service-accounts/

PynamoDBで良い感じにTestableなモデルを定義して、DynamoDB Localを使ってテストする方法

f:id:TatchNicolas:20201211101137p:plain

TL; DR;

  • PynamoDBを使ったテストでローカルで動かすDynamoDBを叩きたい場合に
  • テーブルのキー定義やキャパシティ設定の管理はterraform/CDKなどに任せつつ、アプリケーションコードを汚さずにテストを実行したい
  • そんなときは getattr でmetaclassを取り出して setattr でテスト用の設定値を注入してあげましょう

サンプルコードはこちら

github.com

もうちょっと詳しく

背景

PythonでDynamoDBを使った開発していればPynamoDBはとても便利なライブラリです。非常に書きやすいAPIでDynamoDBを読み書きできますし、手軽にテーブル自体もPynamoDBで作成することも可能です。 *1

しかしPynamoDBでテーブルを作成・管理してしまうと if table.exists() みたいな条件を書いて毎回判断させたり、キャパシティの調整のたびにアプリケーションを動かす必要が出てきてしまいます。 *2

そもそもテーブルの作成やキャパシティの管理はアプリケーションの責務ではないため、本番環境ではPynamoDBの責務をあくまで「アプリケーションからDynamoDBを使うクライアント」ととしての用途に限定し、テーブル自体は

  • Terraformで定義する
  • CDKで定義する
  • Serverless Frameworkを使う場合は serverless.yml の中で定義する

あたりがよく採用される方法かと思います。Partition Key/Sort Keyの設定やインデックスの定義などのほか、キャパシティはProvisionedかOn Demandか、Provisionedであればどれくらい積んでおくのか、といった設定を上記のいずれかのInfrastructure as Codeな方法で管理することでしょう。

すると当然、PynamoDBを使ったアプリケーションコードの中にはキャパシティに関する記述は残したくありません。アプリケーションを動かす際はそれでOKですが、テストを書くときに少し面倒なことになります。

たとえばテストにpytestを使う場合、fixture定義の中で 「テスト用のDynamoDBテーブルをlocalstackやDynamoDB Localのようなツールを使って作成し、テストが終われば削除する」 ような処理を書きたくなると思います。

PynamoDBでは、class Meta の中で host を定義することで、boto3で endpoint_urlを指定したのと同じ効果が得られます。

from pynamodb.models import Model

class Thread(Model):
    class Meta:
        table_name = 'Thread'
        # Specifies the region
        region = 'us-west-1'
        # Optional: Specify the hostname only if it needs to be changed from the default AWS setting
        host = 'http://localhost'
        # Specifies the write capacity
        write_capacity_units = 10
        # Specifies the read capacity
        read_capacity_units = 10
    forum_name = UnicodeAttribute(hash_key=True)

(公式Doc より抜粋 )

host = 'http://localhost' if ENV == "test" else None など書けば、テスト時と本番とで設定を打ち分けることは出来そうです。しかし、 アプリケーションのコードに「これはテスト実行かどうか」を判定する条件文が入るのは好ましくありません

素直にboto3を使っていれば、上記スライド *3 の続きで解説されているのと同様に ddb = boto3.client('dynamodb', endpoint_url='http://localhost') のように作成したオブジェクトに切り替えることで、比較的簡単にアプリケーションからテストのためのロジックを追い出すことが可能でしょう。

しかしPynamoDBを使っている場合は、APIを実際に叩く部分はライブラリの中に隠蔽されてしまい、オブジェクトの切り替えによるテストが難しくなってしまいます。

また、テスト用のテーブルをどう作成するかという問題もあります。せっかくPynamoDBモデルを書いたのだから、できればうまく再利用したいものです。しかし、PynamoDBの Model.create_table() は、キャパシティに関する指定がないとエラーになります。

AttributeError: type object 'Meta' has no attribute 'read_capacity_units'

host の指定の時と同様、 billing_mode = "PAY_PER_REQUEST" if ENV =="test" else None などと書けば回避できそうですが *4 、そうするとまたアプリケーションに「テストのためのロジック」が混入してしまいます。

そこで、今回は

  • アプリケーションのコードにテストのためのロジックを入れない
  • PynamoDBのモデル定義をテスト用のテーブル作成に活用する

を同時に達成できるテストの書き方を考えてみました。

やってみる

サンプルのテーブルの定義はこんな感じです。(GitHubのほうのコードでは、リアリティを出すためにGSIとかも足しています)

from pynamodb.models import Model

class _UserModel(Model):
    class Meta:
        table_name = USER_TABLE_NAME

    uid: UnicodeAttribute = UnicodeAttribute(hash_key=True)
    name: UnicodeAttribute = UnicodeAttribute()
    group: UnicodeAttribute = UnicodeAttribute()

それっぽいテストコードを書きたいので、Repositoryパターンっぽく包んでみます。

class UserRepo:
    def __init__(self) -> None:
        self.model = _UserModel

    def add_user(self, uid: str, name: str, group: str) -> dict[str, str]:
        # 省略 実装はGitHubのサンプルを参照してください

    def get_user(self, uid: str) -> dict[str, str]:
        # 省略 実装はGitHubのサンプルを参照してください

    def get_users_in_group(self, group: str) -> list[dict[str, str]]:
        # 省略 実装はGitHubのサンプルを参照してください

この UserRepo の中の self.model を使って create_table しようとすると、billing_moderead_capacity_units / write_capacity_unitsの設定がないのでAttributeErrorになります。

そこで、pytestのfixtureをこんな感じで書いてみます。

@pytest.fixture(scope="function")
def user_repo() -> Iterable[UserRepo]:

    repo: UserRepo = UserRepo()

    model_meta_class = getattr(repo.model, "Meta") # 1
    setattr(model_meta_class, "host", DDB_LOCAL_HOST) #2
    setattr(model_meta_class, "billing_mode", "PAY_PER_REQUEST") # 3
    setattr(model_meta_class, "table_name", "user_table_for_test") # 4

    repo.model.create_table(wait=True)

    yield repo

    # Delete table after running a test function
    repo.model.delete_table()

何をしているかというと、

  • # 1 でモデル定義の中のメタクラスを取り出し
  • # 2 でそのメタクラスにテスト用のhost(boto3でいうendpoint_url)を設定
  • # 3 でcreate_tableを通すためにbilling_modeを設定
  • # 4 でテスト用のテーブル名に名前を上書き

といった操作をしています。

テストコードはこんな感じで、

def test_user_repo(user_repo: UserRepo) -> None:
    alice = user_repo.add_user(uid="001", name="Alice", group="Red")
    assert alice == {"uid": "001", "name": "Alice", "group": "Red"}

    bob = user_repo.add_user(uid="002", name="Bob", group="Blue")
    chris = user_repo.add_user(uid="003", name="Chris", group="Blue")

    users_in_blue_group = user_repo.get_users_in_group(group="Blue")
    assert users_in_blue_group == [bob, chris]

実際にテストを実行してみると、

docker-compose up -d
docker-compose exec app pytest

結果:

f:id:TatchNicolas:20201210214630p:plain
テスト結果

pytestのfixtureでのテーブル作成が成功し、テストが通りました!

おまけ

また、以前のブログで「Serverless Framework +FastAPI」の開発環境を作った際に「せっかくserverless.ymlのなかでスキーマ定義してるのに、結局PynamoDBでテーブル作るためにモデル定義に余計なコードが混ざってしまうなあ...」という問題が残っていました。

tech.jxpress.net

この問題も、本記事と同じ方法でローカル開発用(≠テスト用)のテーブル作成スクリプトをアプリケーションコードとは別に切り出すことで解決できそうですね。コンテナイメージやLambdaには app/ 以下のコードだけ載せておけば良いので、「アプリケーションに余計なコードが混ざらない」を達成できます。

.
├── app
│   ├── __init__.py
│   ├── config.py
│   ├── main.py
│   └── repository.py
├── create_local_table.py
└── test
    └── test_repository.py

create_local_table.py スクリプトの中身はほとんどpytestの中身と同じです。

from os import environ

from app.repository import UserRepo


DDB_LOCAL_HOST = environ["DDB_LOCAL_HOST"]


if __name__ == "__main__":
    repo: UserRepo = UserRepo()

    model_meta_class = getattr(repo.model, "Meta")
    setattr(model_meta_class, "host", DDB_LOCAL_HOST)
    setattr(model_meta_class, "billing_mode", "PAY_PER_REQUEST")
    setattr(model_meta_class, "table_name", "user_table_for_dev")
    # or
    # setattr(model_meta_class, "read_capacity_units", 1)
    # setattr(model_meta_class, "write_capacity_units", 1)

    by_group_meta_class = getattr(repo.model.by_group, "Meta")
    setattr(by_group_meta_class, "host", DDB_LOCAL_HOST)

    repo.model.create_table(wait=True)

まとめ

Pythonのbuilt-inな関数である getattr / setattr を使うだけですが、当初のねらいであった「アプリケーションのコードにテストのためのロジックを入れない 」「PynamoDBのモデル定義をテスト用のテーブル作成に活用する」を達成することができました。

少しでも参考になれば幸いです。

最後に

JX通信社では、PythonやGoを使って「NewsDigest」の開発に参加してくれるインターン生を募集しています! サーバレス、コンテナなど色々な技術スタックに触れられる環境なので、興味のある方は是非お声かけください!

www.wantedly.com

*1:https://pynamodb.readthedocs.io/en/latest/quickstart.html#creating-a-model

*2:RDSの場合、Read Replicaの数やインスタンスサイズをInfrastracture as codeなツールに(=インフラの責務)、テーブルのスキーマはDjangoやAlembicなどにマイグレーション管理させる(=アプリケーションの責務)パターンになると思いますが、DynamoDBの場合は「テーブル自体の作成・キャパシティ設定(=インフラの責務)」と「キーやインデックスといった設定(=アプリケーションの責務)」という切り分けになり、どのツールに何を任せるかという問題かと思います

*3:Node.jsかつS3の例ですがポイントは同じで、「テストのためのロジックが紛れ込んでいる」ことを問題にしているので例として引用しました

*4:read_capacity_unitsとwrite_capacity_unitsの両方を適当な数値に指定しても回避できます

データ分析者たちのコードレビュー #とは - 散らかったJupyter notebookを片付けるかどうするか問題を考える

JX通信社シニアエンジニアの@shinyorkeです.

最近はチームの朝会でよく着ているTシャツにツッコミを受けてます.*1

JX通信社では, いい感じにデータを整備・運用しているデータ基盤を駆使して,

  • BI(Business Intelligence)文脈でのデータ分析・可視化. ダッシュボード作ったり.
  • 機械学習的なアプローチを使ったR&Dと機能開発(分類タスクなど)

といった業務・タスクを社員・インターン問わず行っています.

データ分析でSQLを書いたり, 「新しいアルゴリズム試すやで!」的なノリでPythonのコードをゴリゴリ書く・動かして結果を見て振り返ってまた臨む...って楽しいですよね.

チームの皆さんも, もちろん私もモチベーション高くやってるわけですが!?

あれ, notebookどこ行ったんや...🤔

よくありますよねー(震え)

自分もチームメイトも, 前のめりになって分析なり機械学習なりをやればやるほど, notebookはどこかに溜まっていき, 「そういえば前にやったnotebookどこだったっけ?🤔」ってなります.

これはきっと私(弊社)に限らずあるあるな問題だと思います.

また, 分析業務や機械学習的なR&Dなどをたくさんやればやるほど,

ワイのコード, 大丈夫だろうか(震え)

と心配になります.

このエントリーでは, 散らかったJupyter notebookを片付けるの諦めたもっとカッコよくいい感じにナレッジをシェアしながら分析したい私が,

  • JX通信社のデータ分析環境の今とこれから, を紹介しつつ
  • 結局, 分析屋さんのコードレビューとnotebookの管理って何のためにするんだったっけ?

というお話をいい感じにまとめて書きたいと思います.

TL;DR

  • モブプログラミング的なノリでレビューするとノウハウの共有・お互いを知る意味でも最高なのでオススメ(オンラインでイケる).
  • notebookの管理で神経と労力を使うのもいいけど「諦める」のも一つの手.

おしながき

JX通信社のデータ分析環境と作業スタイル

早速コードレビューの話をする...前に, JX通信社における

  • データ分析ってどういう環境でしてるの?
  • 現状の問題点

の話をサクッと整理しました.

なお, このエントリーの続きみたいな話です.

tech.jxpress.net

現在の環境

社員・インターンがデータ分析に使っている環境は, 以前のエントリーに触れたとおり, BigQueryを中心に回っています.

f:id:shinyorke:20200425131222p:plain
現状のデータ分析環境あれこれ(要約)

(一部のデータ*2を除き)大抵のデータはBigQueryのテーブルとして存在するので,

  • 単にデータを見たいだけ(≒SQLで事足りる)ならBigQueryのコンソール・Redashを使う
  • 機械学習や統計でちょっと凝ったことをする(≒コードを書かないと難しいタスク)ならColabを使う*3
  • ダッシュボード・レポート等の最終成果物をDataPortal, Redashで行う

といった感じで活用しています(+他のツール・サービスもよく使います*4).

ちなみに手段(ツール・サービスなど)の選択は原則として各人(社員のみならずインターンも対象)に任されています(メンターとしても必ずその話をしています*5).

なお, データの抽出・変換・出力, いわゆるETLについてはAirflow, prefect(一部Luigi)でWorkflowを組み立てて行っています.

ETLの詳細は以前のエントリーをご覧いただけると幸いです(本筋と離れるためこのエントリーでは特に触れません).

現状の課題とやりたいこと

ビッグデータ・データサイエンスのエコシステムの組み合わせでいい感じに分析したり成果出したりと一見すると順風満帆に見えるこの仕組ですが, 課題とやりたいことがいくつか存在します.

  • 現状の課題
    • notebookが散らかる. コード管理やシェアが各人に任されているため, チームの資産としての活用が難しい*6.
    • 分析コード・SQLのレビューが各人任せになっている. プロダクトのコードみたいなMerge Request(Pull Request)をベースとしたチーム作業にはなっていない.*7
    • アドホックなデータ分析・取得の依頼が増えた時の管理. できる人が限られている為ボトルネックになりがち.
  • やりたいこと
    • データマートの整備. 現状は「データレイクの生ログをいい感じにBigQueryにしました」的なデータを扱うことが多いためそれなりに難易度が高い*8.
    • ある程度の分析・可視化(select文の結果をピボットしてグラフ描く的な)をデータサイエンス・エンジニアな人以外にも開放. つまり民主化.

「やりたいこと」の話は後日機会あれば紹介ということで一旦さておいて.

データ分析に関わってるメンバー特に若い人やインターン達にとって不安要素になるかもしれない「コードレビュー」「notebookの管理」は無視できない問題です.

コードレビューとnotebook管理をいい感じにする

「コードレビュー」「notebookの管理」という課題に対して, 会社全体としてどうしていくか?...という問いについてはまだ答えが出ていないのですが.

私(@shinyorke)のチーム*9では,

  • notebookやSQLをモブプログラミング形式で作ったりレビューをしたりする
  • notebook, SQLはあくまで「中間成果物」と捉え, 敢えて管理を放棄する

という方策でやっています.

モブプログラミングの積極活用

f:id:shinyorke:20201109232209p:plain
モブプログラミングいいですよね

「チームで一つになってISSUEと向き合う」でお馴染みのモブプログラミングですが, これをデータ分析チームとして積極活用しています.

実際は対面じゃなくて, Zoomで一つの画面をシェアしながらやってます(みんなリモートしてる*10ので)

モブプログラミングのお題は「インターン生が書いたnotebook/SQLのレビュー」「メンターのshinyorkeがまとめたレポートのレビュー」などその日によって異なりますが大切なのは,

インターンや若い人が時折レビュアーに回る

ようなローテーションを回すようにしています.

私の場合, プログラミングそのものやシステム的・ビジネス的なスキル・知見は若い人よりありますが, 数学・統計・機械学習的なアプローチは現役で学んでいる学生さんの方が上だったりするケースも多々あるので「私もたまには見てもらう側に回る」ようにしています.

若い人やインターンの場合は「プログラミング・SQLの定石」「きれいなコードの書き方」「Pythonの便利ライブラリ」などが勉強になるケースが多いみたいです. つい先日は「コード・SQLをフォーマットして書くのが何故大事か?」という話題で30分くらい時間が溶けました*11.

この方式はそれなりに労力を使いますが, 成果物の最終チェックが進むだけでなく「関わった人の知見・スキルのシェア・勉強がはかどる」「個々人のノウハウがチームに還元される」のと何よりも楽しいので今後も継続していく感じになると思います.

notebookはあくまで「中間成果物」

そして 「散らかったnotebook」問題ですがこれについては,

すっぱり諦める

というお気持ちでやっています.

というのも,

  • notebookはあくまでもアドホックな「らくがき帳」「試行錯誤するためのサンドボックス」である*12
  • 「よっしゃこれはイケるぞ!」ってなった段階でちゃんとしたプロダクトのコード(テストコードを含める)に落とし込むべき
  • (notebookでdiffをとるソリューションはいくつかある*13とはいえ)神経質にdiffを気にするぐらいなら最初から管理そのものを放棄したほうが良い.

これらの定義・信念を考えると「わざわざ管理する方にもっていくのはまだ先でいいかな...」ってなります.

もちろん, 安易に消されたりやってくれた方が退職したりするとそれはそれで困ってしまうので,

  • 可能な限り社内限定でシェアする. 少なくともメンターの手元には残す.
  • 「もしかするとプロダクトコードになるかも」なスニペットはgitに残す

ぐらいはしています.

結び - レビューとメンタリングで大切なこと

というわけで,

分析者のコードレビューには「モブプログラミング」を, notebookの管理は「諦め」が肝心

という話を紹介させてもらいました.

もちろんこの両方が絶対的な答えじゃないです, もっといい方法・考え方があったらぜひフィードバックをいただけると幸いです.

このアプローチ・考え方に至るまで色々と試行錯誤をしたのですが確かなのは,

  • レビュアーはプロダクト・サービスの事に目を向けつつ, レビュイーの良い学び・気付きになるようなレビューに心がける
  • メンターはメンティーにとって「超える壁」になるようなタスクを提供し続け, メンティーの成長を支えること

という想い・思考が元になっています.

なお現在JX通信社では,

のインターンを募集しています!

※学生さん限定です🙇‍♂️

データサイエンスのコードレビューはこのエントリーで触れたとおりですが, エンジニア側もメンターやチームによっていいかんじにケアしたりしてるので興味ある方はぜひカジュアル面談にお越しください(フルリモートでもOKです!).

www.wantedly.com

www.wantedly.com

最後までお読みいただきありがとうございました.

*1:アカウント名(shinyorke)をUKの某バンドのボーカルから拝借する程度にUK Rock🎸好きな影響でいつもバンドTシャツ着てます(どうでもいい注釈).

*2:本文脈とは関係ないですが, 一部のデータはAWS AthenaだったりS3/GCSのストレージ上の生データです.

*3:後述の通り, Colabはマストって訳ではないので個々のPCで環境作ってやるケースもあります, 私が実はその派閥でして

*4:私の観測範囲ではGCPのAI Platformを使ったり, まだお試し中ですがstreamlitを使うこともあります.

*5:弊社はデータサイエンスのみならず, 通常の開発でもPythonを使うことが多く分析業務のデファクトスタンダードもPythonですが中にはRでタスクをこなしてバリューを出しているメンバーもいます.

*6:よくやる処理とか定石が車輪の再発明になってる的な意味での「資産活用が難しい」という話です

*7:余談ですがFASTALERTやNewsDigestといったプロダクトはこの辺しっかりやってます.

*8:正規表現頑張ったりJSONをParseしたりとかそういうレベルの難易度の高さ. もちろん仕様の話もあります.

*9:言い方を変えるとこのアプローチは会社オフィシャルでテンプレート化しているわけではないです&したいなっていう欲はあります.

*10:これはこれで苦労もありますがシフトや働く場所の成約が物理出社の時と比べて334倍以上の自由度があるので良いことのほうがおおい気がします. 首都圏以外の学生さんもインターンとして定期的に入れるとか.

*11:どのツールを使うか, 他社さんでどうやってやっていたか?という話題でもちきりになりました. お互いを知る意味でも有意義な30分でした

*12:notebookをバッチなどの定期実行処理にそのまま使うやり方もあるらしいですが個人的にはこのアプローチはとったことないです.

*13:nbdimeとかが代表的な手段なんですかね?他にもありそう.

Jamstack とサーバーレスで提供する「大阪都構想」特設サイトの舞台裏

開発担当役員(CDO)の小笠原(@yamitzky)です。

11月1日に予定されている「大阪都構想」の住民投票*1についての特設サイトを、先日、ABCテレビと共同リリースしました。JX独自開発のオートコール電話情勢調査システムを活用し、週一ペースでの情勢調査を発表するなど、今までにない取り組みを行っています。

www.asahi.co.jp

より詳細なデータは、ニュース速報アプリ「NewsDigest」でも配信しています。

なんだか宣伝っぽい導入になってしまいましたが、今回は NewsDigest の「大阪都構想」特設ページの技術的な舞台裏をご紹介します。

インフラ全体像

まず、特設サイトのインフラ概略図は次のようになってます。

f:id:yamitzky:20201029164459p:plain
インフラ全体

サイト全体としては、Next.js という React のフレームワークを使っています。特徴的なのは、

  • Next.js を Lambda@Edge を使ってデプロイ
  • 自作 Headless CMS と Firebase を活用した Jamstack 風なアーキテクチャ
  • Fargate を使ったワードクラウドの自動更新

など、最新技術をうまく組み合わせながら構築しています。

Next.js のデプロイ

サイトのデプロイには、 serverless-next.js を使っています。

github.com

serverless-next.js は Serverless Framework のプラグインで、非常に簡単に AWS のサーバーレスな環境へのデプロイが行えるものです。

serverless-next.js でデプロイをすると、次のような構成になります。

  • CloudFront(CDN) が前段に立つため、キャッシュ時間なども調整できる
  • API や SSR *2 は Lambda@Edge でハンドリングされる
  • SSG *3も Lambda@Edge でハンドリングされるが、事前に S3 に保存されている
  • 静的ファイルは S3 から配信

かなりアクティブに開発されており、比較的最新の Next.js の仕様*4にも対応しているのですが、 Incremental Staic Regeneration (自動での SSG の再更新)には対応していません。GitHub の Issue でも議論はされていますが、そんなに CloudFront × サーバーレスな構成と相性が良くないようにも感じます*5

Headless CMS によるサイト更新

今回、ABC テレビのサイト側での配信が静的なものだった*6ため、 SSG を前提とした構成にしました。一方で、週一で更新する情勢調査の解説コメントや、ユーザーからの質問受付など、動的な要素も含んでいました。これらのコンテンツ更新には CMS が欠かせないですし、同時に、コンテンツ更新のたびに HTML/JS を更新したくない、という課題がありました。

そのため、Headless CMS を使い、Jamstack な感じの構成 にしています。

f:id:yamitzky:20201029190634p:plain
Jamstack構成

Headless CMS とは、WordPress のようなコンテンツ管理と配信がセットになった CMS ではなく、コンテンツ管理だけができるような CMS を指しています。今回の場合、配信部分は Next.js が担っていることになります。Headless CMS は、 配信するページにコンテンツ管理機能が紛れ込まないのでセキュアである、というメリットもあります。

今回、Headless CMS には、小笠原が個人開発しているサーバーレスな Headless CMS を導入 しました。この CMS *7 は、データベースに Firebase を使っているので、リアルタイム&安価&柔軟に連携できます。

serverless-headless-cms.vercel.app

ワードクラウドの自動更新

特設サイトでは、週1の情勢調査だけではなく、ワードクラウドを毎日更新しています。

f:id:yamitzky:20201029130720p:plain
特設サイト内ワードクラウド(左)

こちらは Python 製のプログラムを Docker 化し、Fargate を CloudWatch Events 経由で動かして S3 に保存しています。また、インフラ構成管理には Terraform を使っています。

Flourish によるデータの可視化

情勢調査の結果を大阪市民の方々にわかりやすくお伝えするため、 Flourish でデータの可視化を行いました。Flourish は Google Sheets をデータソースとしてグラフを自動更新できるなど便利です。

flourish.studio

ワードクラウドとデータ可視化に関しては、データ×ジャーナリズムを担当している阪神ファンの衛藤さん主導で作っています。

まとめ

振り返ってみると、 特設サイトは開発開始から1週間半程度でリリースすることができました。OSS を利用*8して、サーバーレスを活用しながら、JX通信社のバリューを体現できたかなと思います。また、今回もインターン生に多大なご協力をいただいております。

www.wantedly.com

www.wantedly.com

大阪市民の方、ぜひ特設サイトをご覧になって、11月1日の住民投票にお役立てください。

www.asahi.co.jp

*1:いわゆる「大阪都構想」とは、大阪市を廃止し4つの特別区に再編する構想です

*2:サーバーサイドレンダリング。リクエスト時に動的な HTML を生成する

*3:静的サイト生成。リクエスト時に動的な HTML を生成するのではなく、事前に HTML を生成しておくもの。ただし、動的にはならないので、ISR という仕組みが Next.js では提供されている

*4:redirect などは、Next.js 9.5 の仕様です

*5:Vercel のインフラがどうなっているのか気になります

*6:HTML と JavaScript を提供し、静的ページとして配信しています

*7:まだ名前がないので名前案ください

*8:少額ではありますが、serverless-next.js に寄付しました