リモートでも 1on1 の効率を最大化したいのでGROW モデルを導入してみました

f:id:jazzsasori:20201228225509p:plain

JX通信社 Engineering Manager の @jazzsasori です。

皆さん自身の成長にコミットしてますか?
マネージャーの皆さんメンバーの成長にコミットしてますか?
私はゼルダ無双の体験版をダウンロードしてしまったために成長にコミットできなさそうです。
あと買ってしまいそうです。

弊社もリモート中心のメンバーが増えました

こんなご時世なので弊社も多くのメンバーの勤務がリモート中心となって久しいです。
弊社はSlack, Zoom, Discord を活用、リモートに関する制度の充実などにより比較的コミュニケーションはうまくいっているように思います。
が、ご多分に漏れず多少のコミュニケーションに関する問題も起こっているのも事実です。

最近メンバーとエモい話してますか?
私は昭和の人間なので飲みニケーションが好きです。
私は生中を飲みながら「やったろうぜー!」なんて言いながらウェイするのが好きです。
翌日多少の二日酔いを抱えながら昨日の (半分覚えていない) 熱い話を思い出しながら仕事をがんばるのが好きです。
以前ならなんとなくの日常のコミュニケーションを通じて放っておいても伝わってきたメンバーの希望や今後やりたいことなどがなかなかカジュアルに知れなくなってきています。

私が1on1を担当している方々にGROWモデルを提案しました

そういった差分を埋めるためにどうしたらよいか。
自分の答えはGROWモデルを通したコーチングでした。

私はGROWモデルで皆様をブチ上げたい

というタイトルで社内のwikiツール (kibela) に投稿し、詳細を説明。
あくまで各自の判断にゆだね、やるやらを決めました。
結果一旦は自分の1on1 担当している方全員やっていただけることになりました。

GROW モデルとは

はじめのコーチング を書かれた John Whitmore さんなど (Graham Alexander、Alan Fine) が考えられたコーチングモデルです。
Google さんも re:Workで採用 されています。
Google さんのre:Work に沿って説明すると大きく4つのテーマに分かれており、それぞれで以下のようなことを質問し、チームメンバーとマネージャーが話し合います。

Goal: 目標の明確化

  • 「1 年後、5 年後、10 年後の自分はどうなっていると思いますか?」
  • 「収入や現在のスキルの制約がないとしたら、どのような仕事に就きたいと思いますか?」
  • 「興味、価値を置くもの、原動力となっているものは何ですか?」

Reality, Resource: 現状の把握 (Google さんの re:Work では Reality のみ)

  • 「現在の業務で最もやりがいを感じること、あるいは、ストレスを感じることは何ですか?」
  • 「現在の業務はやりがいがありますか?能力を伸ばせていますか?どうしたらさらにやりがいを感じられますか?やりがいのない業務は何ですか?」
  • 「自分の長所と短所について、他の人からどんな指摘を受けたことがありますか?」

Option: 選択肢の検討

  • 「以前話し合った目標達成のためのスキルを磨くのに、今できることは何ですか?」
  • 「自分を伸ばすために、どのような仕事やプロジェクトに挑戦したいですか?また、どのような経験をしたいですか?」
  • 「選択肢として、どのようなネットワークやメンターシップがありますか?」

Will: 意思の決定

  • 「何を、いつまでに行いますか?」
  • 「役に立つリソースは何ですか?目標達成のために役立つスキルは何ですか?」
  • 「どのような支援が必要ですか?自身のキャリア形成について、マネージャーやリーダーからどのようなサポートを受けたいと考えますか?」

私のGROWモデルを使った具体的なコーチングスタイル

Google さんが公開しているワークシート をコピーして面談に活用することが多いです。
とても良いフォーマットなので全力で乗っかりつつ、自分は相手に合わせて質問を変えたりしています。

「1 年後、5 年後、10 年後の自分はどうなっていると思いますか?」

という質問例がありますが、実際に話を進めていると "5年後" "10年後" は意外と想像しにくいものです。
そこで比較的想像しやすい1年後を聞いたあとに「その1年での成長をベースに2年後3年後はどうなっていたいですか?」と聞くとさらに深い話ができたりします。 逆に1年未満の期間での Goal を聞いてみたりしています。

一番重視していることは傾聴することです。 いいおじさんなので 武勇伝臭いことを語り始めないようにしてます。 コロナ禍でなかなかカジュアルに飲みにもいけませんので、GROW を通したコーチングをよいきっかけとしてメンバーの目指す場所を注意深く聞いています。
酩酊した状態で聞くよりはメンバーの意思を細かく受け取れている気がします。 (いや飲んでてもちゃんと聞いてますよ)

GROWをやってみて

結果として短期間で目に見えた成長を感じることが出来ました。

メンバーの意識が変わった

  • 今まで手を伸ばしていなかった領域を学ぶ時間を確保する
  • frontend が中心だったメンバーが backend のタスクを積極的をこなす (逆も)

対話を通してメンバーの役割を変更

  • スクラムマスターを目指すメンバーの役割を明確に変えた
    • 結果、スクラムマスターとしての役割を積極的にこなしていただけるようになった

良い反響があったり、正直うまくいかなかったり

自分が普段担当していない方にもGROW のために1on1お願いしてもらったりしました。

反面実際やってみてその方には合わないケースもありました。
そういった場合は別の方法で1on1を進めたりしています。
もちろん組織や人によって合わないケースもあると思います。

あくまで大事なのはメンバーとの対話なので柔軟に対応しましょう。

終わりに

私はGROW モデル試してみてとても良かったです。
1on1 においてより深い対話ができていると思います。
あくまで GROW も1つのフレームワークです。方法は組織ごとに色々あってよいと思います。
重要なことなのでもう一度、対話を大事にしましょう。

データサイエンティストの飛び道具としてのStreamlit - プロトタイピングをいい感じにする技術

(ちょっと遅れましたが)新年あけましておめでとうございます🪁

JX通信社シニア・エンジニアで, データサイエンスからプロダクト開発までなんでもやるマンの@shinyorke(しんよーく)と申します.

Stay Homeな最近は大河ドラマを観るのにハマってます&推しの作品は「太平記」です*1.

データ分析やデータサイエンティスト的な仕事をしていると,

「いい感じのアウトプットがでた!やったぜ!!なおプレゼン🤔」

みたいなシチュエーションが割とあると思います.

さあプレゼンだ!となったときにやることと言えば,

  • ドキュメントとしてまとめる. 社内Wikiやブログ, ちょっとしたスライドなど.
  • 分析・実験で使ったモノをそのまま見せる. より具体的に言うとJupyterのnotebookそのもの.
  • 社内のいろいろな方に伝わるよう, ちょっとしたデモ(Webアプリ)を作る.

だいたいこの3つのどれかですが, やはり難易度が高いのは「ちょっとしたデモ作り」かなと思います.

???「動くアプリケーションで見たいからデモ作ってよ!」

この振りってちょっと困っちゃう*2な...って事はままある気がします.

そんな中, 昨年あたりからStreamlitというまさにこの問題をいい感じに解決するフレームワークが流行し始めました.

www.streamlit.io

これがとても良く使えるモノで, 私自身も,

  • 昨年のPyCon JP 2020など, 登壇や個人開発にてプロトタイプが必要なときに利用.*3
  • 業務上, 「動くアプリ」を元にコミュニケーションが必要だったりプレゼンするときに利用

といった所でStreamlitを愛用しています.

とても便利で素晴らしいStreamlitをご紹介ということで,

  • Streamlitをはじめるための最小限の知識・ノウハウ
  • JX通信社の業務においてどう活用したか?
  • Streamlitの使い所と向いていない所

という話をこのエントリーでは語りたいと思います.

TL;DR

  • データサイエンティストが「アプリっぽく」プレゼンするための飛び道具としてStreamlitは最高に良い.
  • Jupyterでできること, Webアプリでできることを両取りしてPythonで書けるのでプロトタイピングの道具として最高
  • あくまで「プロトタイピング」止まりなので仕事が先に進んだらさっさと別の手段に乗り換えよう

おしながき

Streamlitをはじめよう

StreamlitのサイトSample Galleryドキュメントなどかなり充実しているのでそちらを見ていただきつつ, 触りながら覚えると良いでしょう.

触ってみよう

インストールそのものはPythonのライブラリなので,

$ pip install streamlit

でいけちゃいます.

$ streamlit hello

とコマンドを叩くと, http://localhost:8501 で用意されているデモが立ち上がります.

f:id:shinyorke:20210124152408j:plain

最初はデモを触る・コードを読みながら写経・真似しながら動かすと良さそうです.

重要なポイントとしては,

Webブラウザで動く動的なアプリケーションが, .py ファイルを書くだけで動く

ことです.

Javascriptやフロントエンドのフレームワークを使ったり, HTMLやCSSの記述が不要というのがStreamlitのミソです*4.

作って動かそう

hello worldで遊んだ後はサクッと作って動かすと良いでしょう.

...ということで, この先はイメージをつかみやすくするため,

f:id:shinyorke:20210124140258j:plain
こういうのを100行ちょいで作れます

このようなサンプルを用意しました.

github.com

サンプルの⚾️データアプリを元に基本となりそうなところを解説します.

README.mdに設定方法・動作方法があるので手元で動かしながら見ると理解が早いかもです.

(venv) $ streamlit run sample_demo.py

pandas.DataFrameを出力する

pandasに限らず,

  • 何かしらのテキスト
  • 何かしらのオブジェクト(グラフなど)

もそうなのですが, st.write(${任意のオブジェクト}) でいろいろなモノをブラウザで閲覧できるモノとして表示ができます.

たとえば,

st.write('# Stremlit Sample App :baseball:')
st.write('データは[こちらのアプリ](https://github.com/Shinichi-Nakagawa/prefect-baseball-etl)で作ったものです')

st.write('## ひとまずpandasデータフレームの中身を見る')
st.write('`st.write(df.head())`とかやればいい感じに')

import pandas as pd

df = pd.read_csv('datasets/mlb_batter_stats.csv')
st.write(df.head())

こちらはこのように表示されます.

f:id:shinyorke:20210124154332p:plain

※sampleのこの辺です.

作成中・試行錯誤の状態はこのような形でprint debugっぽいやり方でやると良さそうです.

入力フォームを使う

入力フォームもいい感じに作れます.

# サイドバーを使ってみる

st.sidebar.markdown(
    """
    # sidebar sample
    """
)
first_name = st.sidebar.text_input('First Name', 'Shohei')
last_name = st.sidebar.text_input('Last Name', 'Ohtani')
bats = st.sidebar.multiselect(
    "打席",
    ('右', '左'),
    ('右', '左'),
)

上記はこのような感じになります.

f:id:shinyorke:20210124154951p:plain

今回はsidebarという形で横に出しましたが, team_name = st.text_input('Team Name', 'Hanshin') という感じで, sidebarを介さず使うとページ本体に入力を設けることもできます.

フォームで入れたモノはこのように使えます.

st.write('## 打者の打席で絞る')

query = None
if '右' in bats:
    query = 'bats=="R"'
if '左' in bats:
    query = 'bats=="L"'
if ('右', '左') == bats:
    query = 'bats=="R" or bats=="L"'
if query:
    df_bats = df.query(query)
    st.write(df_bats.head())

f:id:shinyorke:20210124155351p:plain

※sampleのこの辺です.

これだけで, DataFrameをインタラクティブに使うアプリケーションが作れます.

グラフを描画する

また, 好きなライブラリでグラフなどを描画できます.

私はよくplotlyを好んで使うのですが,

import plotly.graph_objects as go


# グラフレイアウト
def graph_layout(fig, x_title, y_title):
    return fig.update_layout(
        xaxis_title=x_title,
        yaxis_title=y_title,
        autosize=False,
        width=1024,
        height=768
    )


title = f'{first_name} {last_name}の成績'

fig = go.Figure(data=[
    go.Bar(name='安打', x=df_query['yearID'], y=df_query['H']),
    go.Bar(name='本塁打', x=df_query['yearID'], y=df_query['HR']),
    go.Bar(name='打点', x=df_query['yearID'], y=df_query['RBI'])
])
fig = graph_layout(fig, x_title='年度', y_title=title)
fig.update_layout(barmode='group', title=title)

st.write(fig)

このコードはこうなります.

f:id:shinyorke:20210124155735j:plain

これをWebのちゃんとしたアプリで実装するのは苦労するのですが, ちょっと見せるレベルのモノがこれだけでできるのは感動モノです.

Streamlitを仕事で使う

実際の業務での活用ですが, 私の場合は以下のイメージで使っています.

f:id:shinyorke:20210124151037p:plain
実務でやってること(図)

より具体的には,

  1. まずはJupyterLabやGoogle Colabといった手段(どっちもJupyterが中心)でタスクをこなす
  2. こなしたタスクがいい感じになったらStreamlitでデモアプリを開発
  3. Streamlitで作ったデモでプレゼンを行い, チームメンバーからのフィードバックをもらう

というフローで活躍しています.

そもそもJupyterがアプリを作るのに向いてない所を補完するのがStreamlitの役割なので書き換えはすごく楽です.

JupyterからStreamlitへの書き換え(と比較)については, 以前こちらのエントリーに書いたのでご覧いただけると雰囲気がつかめると思います.

shinyorke.hatenablog.com

また, 「チームメンバーからのフィードバックをもらう」という意味では, 無味乾燥なセルでしかないJupyter(含むColab)よりも,

簡易的とはいえWebのアプリケーションとして見せることができるので, データサイエンティスト・エンジニア以外のメンバーにも伝わりやすい

という長所があります.

みんなで触る

(Streamlitに限った話ではありませんが)手元でWebアプリを動かせるということは, ngrokなど, 手元にあるアプリをproxyできる仕組みでチームメンバーに触ってもらいながらフィードバックをもらうことができます.

Streamlitの場合, デフォルトの設定だとhttp://localhost:8501 というURLが振られる(8501でhttpのportが使われる)*5ので,

$ # すでにstreamlitのアプリケーションが8501で立ち上がってると仮定して
$ ngrok http 8501

これで払い出されたURLを用いることにより, MTGの最中など限定されたシチュエーションで触ってもらいながら議論したりフィードバックをもらうことが可能となります.

様々な理由で決して万能とは言えない方法でもあったりします*6が, リモート作業・テレワーク等で離れた所にいても実際に見てもらいやすくなるのでこの方法はとても便利です.

Streamlitが不得意なこと

ここまでStreamlitの使い所・得意な事を網羅しましたが, 苦手なこともあります.

  • 複数ページに跨るアプリケーションを作ること. 例えば, 「入力->確認->完了 」みたいな複数ページの登録フォームを作るのは苦手(というよりできないっぽい).
  • Streamlitのデザインから変更して見た目を整える.例えば, 「(弊社の代表アプリである)NewsDigestっぽいデザインでデモ作ってくれ」みたいなのは辛い.

The fastest way to build and share data apps(データを見せるアプリを爆速で作るのにええやで) と謳っているフレームワークである以上, ガチのWebアプリなら考慮していることを後回しにしている(かつこれは意味意義的にも非常に合理的と私は思っています)関係上, 致し方ないかなと思います.

なお私の場合はこの長所・短所を把握した上で,

  • Streamlitに移植する段階である程度コードをクラス化したりリファクタリング(含むテストコードの実装)を行い, 将来のWebアプリ・API化に備える
  • 上記でリファクタリングしたコードをそのままFastAPIやFlaskといった軽量フレームワークでAPI化する

といった方針で使うようにしています.

アジャイルなデータアプリ開発を

というわけで, 「データサイエンティストがアプリを作る飛び道具としてStreamlit最高やで!」という話を紹介させていただきました.

最後に一つだけ紹介させてください.

私たちは、ソフトウェア開発の実践

あるいは実践を手助けをする活動を通じて、

よりよい開発方法を見つけだそうとしている。

この活動を通して、私たちは以下の価値に至った。

プロセスやツールよりも個人と対話を、

包括的なドキュメントよりも動くソフトウェアを、

契約交渉よりも顧客との協調を、

計画に従うことよりも変化への対応を、

価値とする。すなわち、左記のことがらに価値があることを

認めながらも、私たちは右記のことがらにより価値をおく。

アジャイルソフトウェア開発宣言より引用

「機敏(Agile)に動くもの作ってコミュニケーションとって変化を汲み取り価値を作ろうぜ!」というアジャイルな思想・スタイルで開発するのはエンジニアのみならずデータサイエンティストも同様です, XP(eXtreme Programming)はデータサイエンティストこそ頑張るべきかもしれません.

そういった意味では, データサイエンティストが使うPythonやその他のエコシステムも「スピード上げて開発して価値を出そう!」という所にフォーカスが当たり始めているのは個人的にとても嬉しいですし, こういった「アジャイルな思想の道具」を使って価値を出していくのは必須のスキルになっていくのではとも思っています.

このエントリーがデータサイエンティストな方のプロトタイプ開発に役立つと嬉しいです.

最後までお読みいただきありがとうございました, そして本年もどうぞよろしくお願いいたします🎍

*1:1991年度作品で, 2021年1月現在放送中の「麒麟がくる」と同じ脚本家さんの作品です.

*2:「そもそもWebアプリの作り方しらない」「作れるんだけど手間が」の二択かなと思います. なお, 私個人は(Streamlit関係なく)どっちでも無いですしむしろ好きな仕事だったりします笑

*3:機械学習的なタスクの成果を見せるデモとして使いました. (詳細はこちら

*4:Streamlitは, そんなWeb・フロントエンドの開発を(少なくとも初手では)やらずに済むようにできたものであると私は認識しています.

*5:ちなみにportを変えたい場合は, streamlit run sample_app.py --server.port 80という感じで, server.portというoptionの指定でいけます

*6:なお, ネットワークの帯域は動かしている環境次第で上手く回らない事もあるので決して万能ではなく, ちょいちょいトラブルもありましたというのを一応付け加えておきます.

Pythonでいい感じにバッチを作ってみる - prefectをはじめよう

JX通信社シニア・エンジニアで, プロダクトチームのデータ活用とデータサイエンスのあれこれ頑張ってるマン, @shinyorke(しんよーく)です.

最近ハマってるかつ毎朝の日課は「リングフィットアドベンチャー*1で汗を流してからの朝食」です. 35日連続続いています.

話は遡ること今年の7月末になりますが, JX通信社のデータ基盤の紹介&「ETLとかバッチってどのFW/ライブラリ使えばいいのさ🤔」というクエスチョンに応えるため, このようなエントリーを公開しました.

tech.jxpress.net

このエントリー, 多くの方から反響をいただき執筆してよかったです, 読んでくださった方ありがとうございます!

まだお読みでない方はこのエントリーを読み進める前に流して読んでもらえると良いかも知れません.

上記のエントリーの最後で,

次はprefect編で会いましょう.

という挨拶で締めさせてもらったのですが, このエントリーはまさにprefect編ということでお送りしたいと思います.

github.com

今回はprefectで簡単なバッチシステムを作って動かす, というテーマで実装や勘所を中心に紹介します.

prefectをはじめよう

prefect #とは

f:id:shinyorke:20201215220448j:plain

簡単に言っちゃうと, Pythonで開発されたバッチアプリのFrameworkで,

The easiest way to automate your data.

(意訳:あなたのデータを自動化していい感じにするのに最も簡単な方法やで)

がウリとなっている模様です.

公式リポジトリのREADME.mdの解説によると,

Prefect is a new workflow management system, designed for modern infrastructure and powered by the open-source Prefect Core workflow engine. Users organize Tasks into Flows, and Prefect takes care of the rest.

(意訳:prefectは今風のインフラストラクチャーに合わせて設計されたFrameworkで, 開発者はTaskとFlowを書いてくれたらあとはPrefect Coreがいい感じにワークフローとして処理するやで)

というモノになります.

ちなみにHello worldはこんな感じです.

from prefect import task, Flow, Parameter


@task(log_stdout=True)
def say_hello(name):
    print("Hello, {}!".format(name))


with Flow("My First Flow") as flow:
    name = Parameter('name')
    say_hello(name)


flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"

@taskデコレーターがついた関数(上記の場合say_helloがそう)が実際に処理を行う関数.

処理に必要な引数を取ったり関数を呼んだりするwith Flow("My First Flow") as flowの部分を開発者が実装, あとはよしなにやってくれます.

こんにちはprefect

という訳で早速prefectをはじめてみましょう.

一番ラクな覚え方・始め方は公式のリポジトリをcloneしてチュートリアルを手元で動かすことかなと思っています.

※私はそんなノリでやりました.

$ git@github.com:PrefectHQ/prefect.git
$ cd prefect
$ pip install prefect SQLAlchemy

SQLAlchemyが入っているのはひっそりとチュートリアルで依存しているからです(小声)*2.

ちなみにPython3.9でも動きました👍

ここまで行けば後はチュートリアルのコードを動かしてみましょう.

$ cd examples/tutorial
$ python 01_etl.py   

このブログを執筆した2020/12/18現在では, 06_parallel_execution.py以外, 滞りなく動きました.

ひとまずこんな感じで動かしながら, 適当に書き換えながら動かしてやるといい感じになると思います.

軽めのバッチ処理を作ってみる

exampleをやりきった時点で小さめのアプリは作れるんじゃないかなと思います.

...と言っても, 何にもサンプルが無いのもアレと思い用意しました.

github.com

baseballdatabankというメジャーリーグ⚾️のオープンデータセットを使って超簡単なETLバッチのサンプルです.*3

f:id:shinyorke:20201215205310p:plain

  • 選手のプロフィール(People.csv)を読み込み
  • 打撃成績(Batting.csv)を読み込み&打率等足りない指標を計算
  • 選手プロフィールと打撃成績をJOINしてのちcsvと出力

というETL Workflowなのですが, こちらの処理はたったこれだけのコードでいい感じにできます.

import logging
from datetime import datetime

import pandas as pd
import click
from pythonjsonlogger import jsonlogger
from prefect import task, Flow, Parameter

logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)


@task
def read_csv(path: str, filename: str) -> pd.DataFrame:
    """
    Read CSV file
    :param path: dir path
    :param filename: csv filename
    :return: dataset
    :rtype: pd.DataFrame
    """
    # ETLで言うとExtractです
    logger.debug(f'read_csv: {path}/{filename}')
    df = pd.read_csv(f"{path}/{filename}")
    return df


@task
def calc_batting_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    打率・出塁率・長打率を計算して追加
    :param df: Batting Stats
    :return: dataset
    :rtype: pd.DataFrame
    """
    # ETLで言うとTransformです
    logger.debug('calc_batting_stats')
    _df = df
    _df['BA'] = round(df['H'] / df['AB'], 3)
    _df['OBP'] = round((df['H'] + df['BB'] + df['HBP']) / (df['AB'] + df['BB'] + df['HBP'] + df['SF']), 3)
    _df['TB'] = (df['H'] - df['2B'] - df['3B'] - df['HR']) + (df['2B'] * 2) + (df['3B'] * 3) + (df['HR'] * 4)
    _df['SLG'] = round(_df['TB'] / _df['AB'], 3)
    _df['OPS'] = round(_df['OBP'] + _df['SLG'], 3)
    return df


@task
def join_stats(df_player: pd.DataFrame, df_bats: pd.DataFrame) -> pd.DataFrame:
    """
    join dataframe
    :param df_player: player datea
    :param df_bats: batting stats
    :return: merged data
    :rtype: pd.DataFrame
    """
    # ETLで言うとTransformです
    logger.debug('join_stats')
    _df = pd.merge(df_bats, df_player, on='playerID')
    return _df


@task
def to_csv(df: pd.DataFrame, run_datetime: datetime, index=False):
    """
    export csv
    :param df: dataframe
    :param run_datetime: datetime
    :param index: include dataframe index(default: False)
    """
    # ETLで言うとLoadです
    logger.debug('to_csv')
    df.to_csv(f"{run_datetime.strftime('%Y%m%d')}_stats.csv", index=index)


@click.command()
@click.option("--directory", type=str, required=True, help="Baseball Dataset Path")
@click.option("--run-date", type=click.DateTime(), required=True, help="run datetime(iso format)")
def etl(directory, run_date):
    with Flow("etl") as flow:
        run_datetime = Parameter('run_datetime')
        path = Parameter('path')
        # Extract Player Data
        df_player = read_csv(path=path, filename='People.csv')
        # Extract Batting Stats
        df_batting = read_csv(path=path, filename='Batting.csv')
        # Transform Calc Batting Stats
        df_batting = calc_batting_stats(df=df_batting)
        # Transform JOIN
        df = join_stats(df_player=df_player, df_bats=df_batting)
        # Load to Data
        to_csv(df=df, run_datetime=run_datetime)

    flow.run(run_datetime=run_date, path=directory)


if __name__ == "__main__":
    etl()

pandasの恩恵に授かって*4prefectのお作法に従うと比較的見通しの良いworkflowが書けますね, というのがわかります.

今回はcsvファイルを最終的なinput/outputにしていますが,

  • ストレージにあるjsonをいい感じに処理してBigQueryにimport
  • AthenaとBigQueryのデータをそれぞれ読み込んで変換してサービスのRDBMSに保存

みたいな事ももちろんできます(taskに当たる部分でいい感じにやれば).

この辺はデータ基盤やETL作りに慣れていない人でもPythonの読み書きができれば直感的に組めるのでかなりいいんじゃないかと思っています.

その他にできること&欠点とか

今回は「ひとまずprefectでETLっぽいバッチを作って動かす」という初歩にフォーカスしていますが, 実はこのprefect高機能でして,

  • タスクの進行状況をGUIで表示可能(AirflowとかLuigiっぽい画面)
  • 標準でDocker, k8sの他GCP, AWS, Azure等のメジャーなクラウドサービスでいい感じに動かせる

など, かなりリッチな事ができます.

一方, 使ったときのネガティブな感想としては,

  • 色々できるんだけど, 色々やるために覚えることはまあまあたくさんある.
  • 色々できるんだけど, それが故に依存しているライブラリとかが多く, 自前でホスティングするときのメンテ効率とかはちょっと考えてしまう.
  • ちゃんとデバッグしてないのでアレですが, 並列処理の機構がホントに並列で動いてるか自信がないときがある🤔

と, 心配なポイントもいくつかありました.

前のエントリーにも記載しましたが,

ETLフレームワーク, 結局どれも癖がありますので長いおつきあいを前提にやってこうぜ!

結局のところこれに尽きるかなあと思います*5.

結び - 今後のこの界隈って🤔

というわけでprefectを使ったいい感じなバッチ開発の話でした.

データ基盤や機械学習のWorkflowで使うバッチのFWやライブラリはホント群雄割拠だなあと思っていまして,

cloud.google.com

note.com

AirflowのDAGがシンプルに書けるようになったり(ほぼprefectと同じ書き方ですよね*6), BigQueryのデータをいい感じにする程度のETLならほぼSQLで終わる未来がくる(かも)だったりと, この界隈ホント動きが活発です.

このエントリーの内容もきっと半年後には古いものになってるかもですが, トレンドに乗り遅れないように今後もチャレンジと自学自習を続けたいと思います!

なおJX通信社ではそんなノリで共に自学自習しながらサーバーサイドのPythonやGoでいい感じにやっていく学生さんのインターンを募集しています.

www.wantedly.com

おそらく私が年内テックブログを書くのは最後かな...

皆様良いお年を&来年また新たなネタでお会いしましょう!

*1:執筆時点のLVは58, 運動負荷はMAXの30です. 筋肉が喜んでます💪

*2:このエントリーのため久々に試していましたがあっ(察し)となります.

*3:なぜ⚾のオープンデータ化というと, 私の趣味かつ手に入りやすい使いやすいオープンデータだったからです.

*4:この程度の処理だとprefectよりpandasの優秀さが目立つ気はしますが, デコレーターでいい感じにflowとtaskに分けられているあたりprefectの設計思想は中々筋が良いと言えそうです.

*5:ETLに限った話ではないのですが, 選んだ以上メンテをちゃんとやる, 使い切る覚悟でやるってことかなあと思っています.

*6:余談ですがprefectの作者はAirflowのコントリビューター?作者??らしいです.

AWS・GCPとKubernetesの権限まわりの用語を具体例から理解する

はじめに

TL; DR;

  • 社内の普段はインフラ以外のところを主戦場にしている人向けに、AWS・GCPの権限に関する用語と概念を説明するために書いたものを加筆訂正して公開します
  • AWS・GCPの権限管理は、基本的な概念は似ているが同じ英単語が別の意味でつかわれているのでややこしい
  • 書いてあること
    • 概念の説明と、関係を表す図
    • EKS・GKEからクラウドリソース *1 を使う時の考え方
  • 書いてないこと
    • 設定のためのコンソール画面のスクショや手順
  • Kubernetesからクラウドリソースを操作する方法は、以前のブログ「GitHub Actionsで実現する、APIキー不要でGitOps-likeなインフラCI/CD」でTerraformによるコードの例も紹介しているので、あわせて参考にしてみてください

想定読者

  • AWSはそこそこ使って慣れているけど、GCPにおける権限管理を理解したい人(またはその逆)
  • マネージドなKubernetes上でクラウドのリソースを使うときの考え方を知りたい人

なぜパッと見てややこしいのかについて、私は

  • 同じ単語が異なる意味で使われている
  • 似ている概念に異なる単語が当てられている
  • ある目的を達成する手段が別の方法で分解されている

の3つの理由があるからだと思います。なので、本記事ではできるだけ枕詞をつけたりセクションを区切って、「何について話しているのか」を明確にしながら解説していきたいと思います。

AWSとGCP

この二者の間では、まず「Policy」および「Role」という単語が異なる意味で使われています。

Policy

AWSにおけるPolicy

  • 「許可・禁止する操作(Actions)とその対象リソース(Resouces)」を表現するもの
    • (例)「xxxって名前のbucketをReadしていいよ」
    • (例)「dev- で始まる名前のDynamoDBについて何でもやって良いよ、ただしテーブル消しちゃダメよ」
  • つまり「操作対象と内容」についての話で、「誰が」その権限を持つかについては関知しない
    • 認証(Authentication)ではなく認可(Authorization)に関係している
    • 「誰」を定義するのAWSではIAM User/Group/Roleの役目

GCPにおけるPolicy

  • 「操作の主体」と「操作の内容」を紐付けるもの
    • 操作の主体
      • (例) Googleアカウントでコンソールにログインする生身のユーザ
      • (例) CloudFunctionやGCEインスタンスが使うService Account
    • 操作の内容: GCPにおけるRole
  • 許可する操作に「Condition(条件)」をつけることができる
    • (例)「君にGCSの読み書き権限あげるけど、 hoge-stg ってバケットだけね」
    • (例)「アナタはBigQueryでQuery実行していいけど、日本時間の月〜金だけね」
  • Webのコンソール上では IAM & Admin > IAM で操作できる
    • (筆者の観測範囲では)AWSから来た人はこの辺で迷うことがよくあるみたいです

Role

AWSにおけるRole

  • IAM Policyのセットをまとめて、「誰か」に使ってもらうもの
  • 生身のユーザーや、Lambda/EC2/ECSなどのワークロードに、Roleを紐付けて使う(=RoleをAssumeする)

GCPにおけるRole

  • 許可する操作をまとめた権限のセット
  • Storage Admin (project) Owner といった名前がついている
  • Roleを紐付ける対象はService AccountまたはUserであり、GCE・CloudFunctionといったワークロードではない

「権限をセットにする」という意味でAWS・GCPともに似ているようですが、実際にワークロードにクラウドリソースの権限を渡す方法が微妙に異なります。では具体例で比較してみましょう。

具体例で対比して理解する

「Serverlessな関数(Lambda/CloudFunction)からオブジェクトストレージ(S3/GCS)の特定のBucketをRead onlyに使いたい」 というケースを考えてみましょう。*2

  • AWSの場合は、「S3バケットを読むためのIAM PolicyをIAM Roleにつけて、そのIAM RoleをLambdaに使わせる」
  • GCPの場合は、「GCSバケットを読むためのRoleをServiceAccountにつけて(=Policyの設定)、CloudFunctionsにそのServiceAccountを使わせる」

図で表すとこのようになります。

f:id:TatchNicolas:20201214090717p:plain
Lambda/CloudFunctionからクラウドリソースを使う例

こうしてみると、 AWSにおけるPolicyは、GCPのPolicyよりもRoleの方に性格が近い かもしれません。整理すれば考え方として似通っているところもあり、決して複雑ではないのですが、こういった概念と用語の違いが「パッと見てややこしく見えてしまう」原因ではないでしょうか。

また、「オブジェクトストレージ(S3/GCS)系サービス触っていいけど、このバケットだけね」のように個別のリソース単位の制限をかけるのが

  • AWSの場合はIAM Policyで可能な操作(S3の読み取り)と対象(許可する個別のバケットの絞り込み)をまとめて指定する
  • GCPの場合はRoleでGCSというサービス自体の読み取り権限を定義され、それを主体と紐付ける(=Policyを定義する)ところでConditionをつけて対象のバケットを絞る

という違いがあり、(AWSからみれば)GCPでは同じことを達成するための手段が分解されているように見えます。用語の違いだけでなく、同じことを実現するための手段が違うステップに分かれていることも、最初は少し混乱してしまう理由の一つではないでしょうか。

Kubernetes上のPodからクラウドのリソースを使う場合

では、AWS・GCPのマネージドなKubernetes(=EKS/GKE)の上で動くワークロード(=Pod)から、S3/GCSなどのクラウドリソースを操作する場合を考えていきましょう。

AWS・GCPの層の上にKubernetesが乗っかり、「(Kubernetesの)ServiceAccount」という概念が出てくるので、ここまでの説明を踏まえて整理してみます。

AWSとKubernetes(EKS)

EKSからAWS上のリソースを使いたい場合は「IAM RoleとEKSクラスタ上のServiceAccountを紐付ける」が基本になります。 *3

もちろん「IAM Userを作成し、APIキーを発行して、KubernetesのSecretリソースを通してPodに持たせる」こともできますが、静的なAPIキーの発行は避けた方が良いでしょう。*4

GCPとKubernetes(GKE)

前述のEKSの場合と同様の考え方で、「GCPのServiceAccount(GSA)とGKEクラスタ上のServiceAccount(KSA)を紐付ける」が基本になります。その方法がWorkload Identityです。

GCPの世界とKubernetesの世界の両方に「Service Account」という単語が出てくることが少しややこしいので、「今どちらについて話題にしているのか」を意識しておくとよいでしょう。

Workload Identityを使うことで、静的なAPIキーを発行することなくKubernetes上のワークロード(=Pod)にGCPリソースを利用させることができます。

具体例で対比して理解する

具体例として前述のLambda/CloudFunctionsの例にならって、「EKS/GKE上のPodからS3/GCSの特定のBucketをRead onlyに使う」ケースを考えてみましょう。

f:id:TatchNicolas:20201214090957p:plain
k8s上のPodからクラウドリソースを使う例

図にしてみると、非常によく似ていることがわかりますね。

まとめ

AWS・GCPおよびKubernetesの権限まわりの用語と概念を、具体例で対比しながら整理してみました。どなたかの参考になれば幸いです。

最後に

JX通信社では、PythonやGoを使って「NewsDigest」の開発に参加してくれるインターン生を募集しています! 特に、AWS・GCPの利用経験のある方は歓迎します!

www.wantedly.com

*1:本記事ではS3・GCSといったオブジェクトストレージやRoute53・CloudDNSなどのDNSサービスなど、「利用するためにクラウドの権限が必要なもの」を指します

*2:厳密には、S3のBucket PolicyやGCSのACLなどBucket側の設定も存在しますが、今回はIAM側について話しているので言及しません

*3:https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html

*4:今は静的なAPIキーなしでKubernetesのServiceAccount単位でIAM Roleを設定できますが、かつてはそれが不可能でAPIキーを発行せずに権限を持たせるにはNode単位でEC2としてInstance Profile設定するしかありませんでした。この辺りの経緯や詳細は、こちらの解説がわかりやすいです https://dev.classmethod.jp/articles/eks-supports-iam-roles-for-service-accounts/

PynamoDBで良い感じにTestableなモデルを定義して、DynamoDB Localを使ってテストする方法

f:id:TatchNicolas:20201211101137p:plain

TL; DR;

  • PynamoDBを使ったテストでローカルで動かすDynamoDBを叩きたい場合に
  • テーブルのキー定義やキャパシティ設定の管理はterraform/CDKなどに任せつつ、アプリケーションコードを汚さずにテストを実行したい
  • そんなときは getattr でmetaclassを取り出して setattr でテスト用の設定値を注入してあげましょう

サンプルコードはこちら

github.com

もうちょっと詳しく

背景

PythonでDynamoDBを使った開発していればPynamoDBはとても便利なライブラリです。非常に書きやすいAPIでDynamoDBを読み書きできますし、手軽にテーブル自体もPynamoDBで作成することも可能です。 *1

しかしPynamoDBでテーブルを作成・管理してしまうと if table.exists() みたいな条件を書いて毎回判断させたり、キャパシティの調整のたびにアプリケーションを動かす必要が出てきてしまいます。 *2

そもそもテーブルの作成やキャパシティの管理はアプリケーションの責務ではないため、本番環境ではPynamoDBの責務をあくまで「アプリケーションからDynamoDBを使うクライアント」ととしての用途に限定し、テーブル自体は

  • Terraformで定義する
  • CDKで定義する
  • Serverless Frameworkを使う場合は serverless.yml の中で定義する

あたりがよく採用される方法かと思います。Partition Key/Sort Keyの設定やインデックスの定義などのほか、キャパシティはProvisionedかOn Demandか、Provisionedであればどれくらい積んでおくのか、といった設定を上記のいずれかのInfrastructure as Codeな方法で管理することでしょう。

すると当然、PynamoDBを使ったアプリケーションコードの中にはキャパシティに関する記述は残したくありません。アプリケーションを動かす際はそれでOKですが、テストを書くときに少し面倒なことになります。

たとえばテストにpytestを使う場合、fixture定義の中で 「テスト用のDynamoDBテーブルをlocalstackやDynamoDB Localのようなツールを使って作成し、テストが終われば削除する」 ような処理を書きたくなると思います。

PynamoDBでは、class Meta の中で host を定義することで、boto3で endpoint_urlを指定したのと同じ効果が得られます。

from pynamodb.models import Model

class Thread(Model):
    class Meta:
        table_name = 'Thread'
        # Specifies the region
        region = 'us-west-1'
        # Optional: Specify the hostname only if it needs to be changed from the default AWS setting
        host = 'http://localhost'
        # Specifies the write capacity
        write_capacity_units = 10
        # Specifies the read capacity
        read_capacity_units = 10
    forum_name = UnicodeAttribute(hash_key=True)

(公式Doc より抜粋 )

host = 'http://localhost' if ENV == "test" else None など書けば、テスト時と本番とで設定を打ち分けることは出来そうです。しかし、 アプリケーションのコードに「これはテスト実行かどうか」を判定する条件文が入るのは好ましくありません

素直にboto3を使っていれば、上記スライド *3 の続きで解説されているのと同様に ddb = boto3.client('dynamodb', endpoint_url='http://localhost') のように作成したオブジェクトに切り替えることで、比較的簡単にアプリケーションからテストのためのロジックを追い出すことが可能でしょう。

しかしPynamoDBを使っている場合は、APIを実際に叩く部分はライブラリの中に隠蔽されてしまい、オブジェクトの切り替えによるテストが難しくなってしまいます。

また、テスト用のテーブルをどう作成するかという問題もあります。せっかくPynamoDBモデルを書いたのだから、できればうまく再利用したいものです。しかし、PynamoDBの Model.create_table() は、キャパシティに関する指定がないとエラーになります。

AttributeError: type object 'Meta' has no attribute 'read_capacity_units'

host の指定の時と同様、 billing_mode = "PAY_PER_REQUEST" if ENV =="test" else None などと書けば回避できそうですが *4 、そうするとまたアプリケーションに「テストのためのロジック」が混入してしまいます。

そこで、今回は

  • アプリケーションのコードにテストのためのロジックを入れない
  • PynamoDBのモデル定義をテスト用のテーブル作成に活用する

を同時に達成できるテストの書き方を考えてみました。

やってみる

サンプルのテーブルの定義はこんな感じです。(GitHubのほうのコードでは、リアリティを出すためにGSIとかも足しています)

from pynamodb.models import Model

class _UserModel(Model):
    class Meta:
        table_name = USER_TABLE_NAME

    uid: UnicodeAttribute = UnicodeAttribute(hash_key=True)
    name: UnicodeAttribute = UnicodeAttribute()
    group: UnicodeAttribute = UnicodeAttribute()

それっぽいテストコードを書きたいので、Repositoryパターンっぽく包んでみます。

class UserRepo:
    def __init__(self) -> None:
        self.model = _UserModel

    def add_user(self, uid: str, name: str, group: str) -> dict[str, str]:
        # 省略 実装はGitHubのサンプルを参照してください

    def get_user(self, uid: str) -> dict[str, str]:
        # 省略 実装はGitHubのサンプルを参照してください

    def get_users_in_group(self, group: str) -> list[dict[str, str]]:
        # 省略 実装はGitHubのサンプルを参照してください

この UserRepo の中の self.model を使って create_table しようとすると、billing_moderead_capacity_units / write_capacity_unitsの設定がないのでAttributeErrorになります。

そこで、pytestのfixtureをこんな感じで書いてみます。

@pytest.fixture(scope="function")
def user_repo() -> Iterable[UserRepo]:

    repo: UserRepo = UserRepo()

    model_meta_class = getattr(repo.model, "Meta") # 1
    setattr(model_meta_class, "host", DDB_LOCAL_HOST) #2
    setattr(model_meta_class, "billing_mode", "PAY_PER_REQUEST") # 3
    setattr(model_meta_class, "table_name", "user_table_for_test") # 4

    repo.model.create_table(wait=True)

    yield repo

    # Delete table after running a test function
    repo.model.delete_table()

何をしているかというと、

  • # 1 でモデル定義の中のメタクラスを取り出し
  • # 2 でそのメタクラスにテスト用のhost(boto3でいうendpoint_url)を設定
  • # 3 でcreate_tableを通すためにbilling_modeを設定
  • # 4 でテスト用のテーブル名に名前を上書き

といった操作をしています。

テストコードはこんな感じで、

def test_user_repo(user_repo: UserRepo) -> None:
    alice = user_repo.add_user(uid="001", name="Alice", group="Red")
    assert alice == {"uid": "001", "name": "Alice", "group": "Red"}

    bob = user_repo.add_user(uid="002", name="Bob", group="Blue")
    chris = user_repo.add_user(uid="003", name="Chris", group="Blue")

    users_in_blue_group = user_repo.get_users_in_group(group="Blue")
    assert users_in_blue_group == [bob, chris]

実際にテストを実行してみると、

docker-compose up -d
docker-compose exec app pytest

結果:

f:id:TatchNicolas:20201210214630p:plain
テスト結果

pytestのfixtureでのテーブル作成が成功し、テストが通りました!

おまけ

また、以前のブログで「Serverless Framework +FastAPI」の開発環境を作った際に「せっかくserverless.ymlのなかでスキーマ定義してるのに、結局PynamoDBでテーブル作るためにモデル定義に余計なコードが混ざってしまうなあ...」という問題が残っていました。

tech.jxpress.net

この問題も、本記事と同じ方法でローカル開発用(≠テスト用)のテーブル作成スクリプトをアプリケーションコードとは別に切り出すことで解決できそうですね。コンテナイメージやLambdaには app/ 以下のコードだけ載せておけば良いので、「アプリケーションに余計なコードが混ざらない」を達成できます。

.
├── app
│   ├── __init__.py
│   ├── config.py
│   ├── main.py
│   └── repository.py
├── create_local_table.py
└── test
    └── test_repository.py

create_local_table.py スクリプトの中身はほとんどpytestの中身と同じです。

from os import environ

from app.repository import UserRepo


DDB_LOCAL_HOST = environ["DDB_LOCAL_HOST"]


if __name__ == "__main__":
    repo: UserRepo = UserRepo()

    model_meta_class = getattr(repo.model, "Meta")
    setattr(model_meta_class, "host", DDB_LOCAL_HOST)
    setattr(model_meta_class, "billing_mode", "PAY_PER_REQUEST")
    setattr(model_meta_class, "table_name", "user_table_for_dev")
    # or
    # setattr(model_meta_class, "read_capacity_units", 1)
    # setattr(model_meta_class, "write_capacity_units", 1)

    by_group_meta_class = getattr(repo.model.by_group, "Meta")
    setattr(by_group_meta_class, "host", DDB_LOCAL_HOST)

    repo.model.create_table(wait=True)

まとめ

Pythonのbuilt-inな関数である getattr / setattr を使うだけですが、当初のねらいであった「アプリケーションのコードにテストのためのロジックを入れない 」「PynamoDBのモデル定義をテスト用のテーブル作成に活用する」を達成することができました。

少しでも参考になれば幸いです。

最後に

JX通信社では、PythonやGoを使って「NewsDigest」の開発に参加してくれるインターン生を募集しています! サーバレス、コンテナなど色々な技術スタックに触れられる環境なので、興味のある方は是非お声かけください!

www.wantedly.com

*1:https://pynamodb.readthedocs.io/en/latest/quickstart.html#creating-a-model

*2:RDSの場合、Read Replicaの数やインスタンスサイズをInfrastracture as codeなツールに(=インフラの責務)、テーブルのスキーマはDjangoやAlembicなどにマイグレーション管理させる(=アプリケーションの責務)パターンになると思いますが、DynamoDBの場合は「テーブル自体の作成・キャパシティ設定(=インフラの責務)」と「キーやインデックスといった設定(=アプリケーションの責務)」という切り分けになり、どのツールに何を任せるかという問題かと思います

*3:Node.jsかつS3の例ですがポイントは同じで、「テストのためのロジックが紛れ込んでいる」ことを問題にしているので例として引用しました

*4:read_capacity_unitsとwrite_capacity_unitsの両方を適当な数値に指定しても回避できます