新型コロナダッシュボード爆速リリースの舞台裏 〜小さく始めて大胆に変えるフロントエンドプロジェクト〜

JX通信社CDOの小笠原(@yamitzky)です。

JX通信社は「今起きていることを明らかにする報道機関」というミッションの元に、新型コロナリアルタイムダッシュボードを 2月16日 から提供し続けています。今回は、「新型コロナプロジェクト」の発足から現在に至るまでの、プロジェクトの進化についてご紹介します。

f:id:yamitzky:20200529014632p:plain

プロジェクト発足

そもそものプロジェクトの発足としては、2月14日の下記のツイートが発端でした。およそ 2 日でリリースしたことになります。

当時は、東京都のような自治体公式の特設サイトや、国内全体の動向をまとめたサイトはほぼありませんでした。国内の公共機関・報道機関のなかで、かなり速くリリースできた部類に該当するかと思います。

フェーズ1: Vue.js でのプロトタイピング

現在は React で作られているプロジェクトですが、当初のプロトタイピングフェーズでは Vue.js を使っていました。

Vue.js は Progressive(漸進的) なウェブフロントフレームワークです。最もシンプルなのは、次のような単体の HTML として配信する方法です。この形式ではトランスパイル(Webpack によるビルド等)の必要もなく、HTML 単体を配信するだけでも SPA になり、非常に手早くプロジェクトを開始できます。

// index.html
<html>
  <body>
    <div id="app">{{ message }}</div>
    <script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script>
    <script>
      new Vue({
        el: "#app",
        data: () => ({ message: "hi" })
      })
    </script>
  </body>
</html>

今回のプロジェクトでは「なるべく早くリリースする」という目的があり、ツール周りでつまずきたくなかったので、まずは Vue.js で雑にプロトタイピングしました。

フェーズ2: React × TypeScript × Emotion × Parcel への移行]

f:id:yamitzky:20200529030051p:plain

プロトタイプとして実現したものの、NewsDigest のウェブメディアで配信したいという課題がありました。NewsDigest の CMS は、Django (Python) で管理しており、必ずしも SPA との相性が良いとは言えません。

そこで次のような形で、 CMS にたった 2 行のタグを埋め込むだけで新型コロナのダッシュボードを配信できるようにしてみました*1

f:id:yamitzky:20200529020642p:plain

技術的な必要条件としては、

  • 1 つの script タグに JavaScript の依存関係を全て詰め込む
  • 1 つの script タグさえ埋め込めば、デザイン(stylesheet) も再現できる

です。これを最短で実現するため、バンドラー(Webpackの代替)として Parcel を利用しました。Parcel はほぼ設定不要で使えるバンドラーなので、今回のような急ぎのプロジェクトにピッタリです。また、CSS ファイルに依存せずデザインを指定するため、Emotion も使っています。Emotion は React と相性の良い TypeScript ベースのデザイン用ライブラリで、JS 内にデザイン指定を埋め込めます。

// index.tsx
const Wrapper = styled.div({
  padding: 8,
  color: 'red',
  display: 'flex',   // vendor prefix も自動付与
})
export const Component = () => <Wrapper>some content</Wrapper>

つまり、プロトタイピングから本番配信までの間に、Vue.js から React・TypeScript にまるっと置き換えたことになります*2

フェーズ3: gitでのデータ管理から、サーバーレスなデータ管理へ

新型コロナの感染者数字は、リアルタイムに更新されます。実は、プロジェクト初期ではデータベースを用意しておらず、TypeScript のソースコード内に直書きする形でデータ管理をしていました。数値更新するたびに GitLab CI が動いて、JS ファイル自体をデプロイしていたのです・・・!

f:id:yamitzky:20200529130138p:plain

// 実際の data.ts
export const data = {
  modified: '2020.02.19 11:30',
  japanStats: {
    infected: 616,
    infectedChange: 96,
    dead: 1,
    deadChange: 0,
    // クルーズ船(ダイヤモンド・プリンセス号)
    cruiseInfected: 542,
    cruiseInfectedChange: 0
  },
   ...
}

さすがに、機械化しづらい、型安全でなく事故が起こりやすい、git でデータ更新をするのは属人性が高い、Python 製の FASTALERT API としての提供が難しい... という事情もあり、Google Spreadsheet をデータベースとして利用し、JSON から使えるようにしました。また、データ更新の仕組みは GitLab CI ではなく、AWS Lambda でサーバーレスな形で動かすようにしています。

f:id:yamitzky:20200529130900p:plain

フェーズ4: Next.js での SSR への移行

<script> タグでの配信は、SEO 的に弱い可能性があるのではないか、という懸念が生まれました。そこで当初の Parcel から、Next.js へ移行しました。Next.js はReact のフレームワークで、サーバーサイドレンダリングの機能などが組み込まれています。

この Next.js のサーバーは、Amazon ECS(Fargate) 上にデプロイしています。

フェーズ5: ウィジェットとしての外部提供と yarn monorepo 化

JX 通信社が提供している新型コロナダッシュボードは、メドピア社LINE社にウィジェットとしても提供しています。また、NewsDigest だけでなく FASTALERT 内でも配信しています。NewsDigest での提供、ウィジェットとしての提供、FASTALERT 内での提供、、、これらを 1 プロジェクトでやるのは現実的ではなかったため、次のような monorepo 構成を行っています*3

f:id:yamitzky:20200529164607p:plain

@corona/components ・・・ 各種グラフを提供するライブラリ。FASTALERT からも npm install している
@corona/server ・・・ @corona/components に依存する、next.js のプロジェクト。newsdigest.jp 用
@corona/widget ・・・ corona/components に依存する、webpack のプロジェクト。ウィジェット配信用

この monorepo 構成は、yarn の workspace 機能を使っています。

また、@corona/server と @corona/widget では、ビルドツールなども異なっています。これは、プロジェクトの目的や、求められる安定性*4などに応じて、意図的に使い分けたものです。

うまく monorepo 構成にすれば、コードを使いまわしつつ、最適な技術選定ができるようになります。

フェーズ6: グラフライブラリの移行

プロジェクト当初は、chart.js と d3 などを使っていましたが、 vx という React 向けのグラフライブラリに移行しました。

Chart.js は canvas ベースのグラフライブラリです。しかし、 React のような宣言的な UI の思想との相性の悪さや、サーバーサイドレンダリングできない、柔軟にカスタマイズできないなどの課題がありました。

d3.js もデータ可視化に使っていましたが、厳密には「ドキュメントをデータに基づいて操作・構築するためのライブラリ」です。雑に言えば「DOMを操作するためのライブラリ」なので、 React(React-DOM)のような DOM 操作のライブラリと、役割的にかぶっています。

そこで、 React ベースの低レイヤーなデータ可視化ライブラリである vx に移行し、React だけで SVG での可視化をしています。

  • React 的な、宣言的データ可視化の実現
  • TypeScript の型
  • 柔軟なデータ可視化の実現

などができるようになりました。一方で、パフォーマンスチューニングや、コード量の増加などは起きています。

f:id:yamitzky:20200529115825p:plain
凡例や横軸の位置など、かなり微調整しています

まとめ

新型コロナウイルスに関しては、社会的な需要や、刻々と変わる状況などを踏まえ、かなりスピード重視でプロジェクトが発足しました。当初は git でデータ管理していたほど、一般的なアプリケーション構成のセオリーからは外れた作り方です。

一方で、新型コロナダッシュボード提供開始から3ヶ月経ち、ビジネスの状況に応じて大規模なアーキテクチャ変更(式年遷宮)を行っており*5「ビジネスとスピードと品質の両立」も実現できたと思います。これらの両立のポイントは「ビジネス要件に合わせて技術を使いこなし、いかに漸進的に進めるか」です。

f:id:yamitzky:20200529132312p:plain

今回のプロジェクトでは、かなりインターン生に協力いただいています。本当にありがとうございます! 引き続き、フロントエンドのエンジニアインターンを募集中です(まだ新型コロナは収束していないのでリモート中心です)。ぜひ一緒に、インパクトのある開発をしましょう!

*1:この JS 自体は、Amazon S3 と CloudFront で配信しています

*2:同じようなことを実現する方法は他にもあります。この技術選定は、慣れの部分が大きいです

*3:データ更新用サーバーレスパッケージなども含めると、10 個ほどパッケージが含まれています

*4:ウィジェットは絶対にサーバーを落としたくないため、SSR は行っていません。また、顧客ごとにカスタマイズする関係で SSG にもしていません

*5:プロジェクト発足から 3 ヶ月ぐらいしか経っていないのに、1000行以上差分のあるプルリクが複数あります。同僚から「走りながら車輪を全交換した話」というタイトル案が出るくらいです

コワクナイBigQuery - チームでデータを活用するための活動について

JX通信社でシニア・エンジニア&データ基盤エンジニアをしています, @shinyorke(しんよーく)です.

最近のStayHome生活は, 「YouTubeでUK Rock🎸, お笑いの動画を見まくる」「地元のお取り寄せグルメを楽しむ」「野球データ分析に集中する」で楽しんでいます.

JX通信社では, 「FASTALERT」などのプロダクトのデータをBigQueryに集約し,

  • 機械学習を用いたプロダクト開発・施策
  • プロダクト・サービスの改善に関する分析
  • 日々のイベントをメトリクス化して可視化(いわゆるBI的なもの)

に活用しています.

このエントリーでは,

???「BigQuery?使い方わからないし💰かかるって噂を聞くのでコワイよ」

ワイ将「BigQueryコワクナイヨー, ガンガンBigQuery使えるように仕組み作ったので乗っかりましょ!」

というお話を残したいと思います.

TL;DR

  • 「仕組みの理解」「コストの可視化」を愚直にやっていけば, BigQueryはコワクナイ. むしろ友だちになれる!
  • RDBMSとの違い(特にカラムナー), データサイエンスな人にはPandasとの共通点を教えれば仕組みはしっかり伝わる
  • コストの可視化は本気でやれば2日くらいでシュッとできます.

会社やご家庭でBigQueryをお使いな方の参考になると嬉しいです.

スタメン

全体像 - 今現在の姿

JX通信社では, 昨年の秋から全社的なデータ基盤を構築・運用を開始し, 利用者・使いたい人の声を元に日々改善を行っています.

データ基盤全体はGCP上に構築(ETLの一部タスクを除く*1), DWHはBigQueryを使っています.

利用者であるチームメンバー(エンジニアおよびデータを使いたい人すべて)には以下の図のような仕組みを提供しています.

f:id:shinyorke:20200425131222p:plain
【図】BigQueryを取り巻く仕組み

データ基盤を構築し, 運用し始めた当初は「Colab(Jupyter notebook)とRedash, Data Portal用意すればまあイケるのでは?」と思っていたのですが,

  • BigQueryは噂でしか聞いたことない, サービスのDBを直接分析する場合との違いは?
  • BigQueryはお金がかかる, という風潮. 「数百万円飛ぶでしょ?」的な

という声があったり雰囲気を感じたりしました.

確かに, 自分でBigQueryを使っていても「このクエリって流したらいくらだ💰?」とか気になります.

というわけで,

  • BigQuery(含むデータエンジニアリング)の正しい知識を普及しよう
  • 毎日コストを見ることができればええやん!

この2つをシュッとやることにしました.

BigQueryに関する正しい知識を

正しい知識を普及する, 植え付けるのであれば自分から動けばエエやん!...ということで,

  • BigQuery使いたいぞっていうメンバーがいるとき
  • データ分析のインターンがJoinする時のオンボーディング*2

...というタイミングで社内勉強会を不定期に開催しています*3.

これについては, 私が作ったテキスト(社内のKibelaに公開)を用いてやっています.

f:id:shinyorke:20200425133707p:plain
JX通信社版・BigQueryの教科書

このテキストのTL;DR的なモノを一部紹介すると,

  • BigQueryは普段使ってるRDBMSとは違い, DWHであり列指向データセットである.
  • select文で検索する, joinする, 集計する的なのはDBと変わらないが, 必ず必要な列だけ使う&範囲を絞って使う.
  • 必要な列だけ取得し, 対象範囲を絞ってもらえればお安く使える.

必要なことを最短距離で覚えられる内容にしています. 3分34秒以上かかるかもですが*4.

ちなみに教え方のコツとしては,

  • RDBMSを知ってる人には「列指向」な事をしっかり教える. MySQLやPostgreSQLで培ったIndexとかの知識は使えないよっていうのを理解してもらう.
  • そもそもDBやSQLを触ったことがない(インターンなど若い人に多い印象)にはPandasやRでよく使うDataframeにたとえて教える.*5

これでいい感じになります.

常にコストを監視できる仕組み

正しい知識を身に着けてもらった後, BigQueryの利用頻度が増えてウッキウキでしたが,

インターン生「shinyorkeさんこのクエリ実行して大丈夫ですか💰?」

ワイ将「お, おう...(そっか心配だよね)」

っていうやり取りが増えました.

せっかくいい感じに分析してもらったりレポート出してもらってるのに彼らを心配にさせちゃいけないよなあ〜ということで,

毎日GCP全体およびBigQueryのコストを通知するSlack Botを開発・運用し始めました.

f:id:shinyorke:20200426122234p:plain
毎日通知, 一定金額超えたら赤くなります

大変ありがたいことに, GCPにはアカウントの利用料金を定期的にBigQueryに流す仕組みが提供されており,

  • 毎日BigQueryの特定のテーブルにデータが入る
  • ↑のコストをSlackにつぶやかせればいい!

という実にシンプル過ぎるやり方でできる事がわかり, 二日ちょっとで開発・運用スタートしました.

ちなみに仕組みはこんな感じです.*6

f:id:shinyorke:20200425140045p:plain
コスト監視マンのアーキテクチャ

数行のSlack Botでできちゃうのでこれは強くオススメです!

結び

「コワクナイBigQuery」の成果ですが,

  • BigQueryに関する知識がついた・恐怖心が無くなったメンバーが使ってくれたり
  • インターン生たちが自主的にSQLを覚えて, データ分析やダッシュボード作りに活用してくれたり
  • 特定のパターン・クエリに対するお値段という「相場」が誕生したことにより, 大胆にBigQueryでガンガンSQLを流すことが可能に

...という, 「まずは使ってみよう, アウトプットだそう!」という流れができました.

基本を教える, コスト感を掴むという視点で心理的安全性ができたのかなーというのが私の感想です*7.

ちなみに自分はインターン生にSQLを共有した所, 「shinyorkeさんもっといいやり方あるよ」的なレビューを受けたりもしました, これは素直に嬉しい.

今後は,

  • ビジネス・セールスメンバーがもっとデータで攻められる用にデータセットやダッシュボード増やしたり
  • 筋がよいSQLやデータ・ダッシュボードを共有したり

といった所で活用事例を伸ばしたいなと思っています.

最後までお読みいただきありがとうございました!

*1:サービスの大半がAWSで運用しており, Extractに当たるタスクはAWSを使っています.

*2:ちなみにですが, 4/30現在ではデータ分析インターンの募集は締め切っております :gomen:

*3:過去に3回ほどやりましたが, いずれもZoomでリモート参加可能な会でした.

*4:なんでや阪神(以下略)

*5:PandasもRも列方向で探索するのに最適化しているモノなので.っていうのとMethodとの関係も伝わるので相当おすすめです

*6:アーキテクチャそのものは自分が過去にやった事例をベースにやりました.もっというと構成図もパクった.

*7:正直な話, コワクナイBigQueryの施策の真意はここにありました. 知ることと見える化は大事なのです.

Serverless Framework+mangum+FastAPIで、より快適なPython API開発環境を作る

はじめに

最近ハイボールにハマっているSREのたっち(@TatchNicolas)です。

昨日オンライン開催されたJAWS DAYS 2020にて、JX通信社もサーバレスをテーマとして発表をしました。(by 植本さん

発表でもありましたように、上記プロジェクトにおいて開発当時はスピードを優先してプロジェクトメンバーの手に馴染んでいて分担もしやすいフレームワークとしてFlaskを採用しました。

一方で、JX通信社としてはFlaskよりもFastAPIを使うプロジェクトが増えてきており、今後もその傾向は続く見込みです。

そこで、特設ページ作成やAPI提供など初動としての開発が一段落したのを機に、JAWS DAYSで発表した仕組みを今後のために発展させる検証をしたので紹介します。

TL; DR;

  • JAWSでは Serverless Framework+awsgi+Flaskな構成でスピーディにコロナ特設ページ向けAPIを作った話をした
    • 緊急性の高いプロジェクトであったが、非常にスピード感のある開発ができた
  • Flaskに代わるフレームワークとしてJX通信社ではFastAPIが流行中
    • OpenAPIドキュメント自動生成、Request/Responseをクラスとして定義するなどカッチリと開発ができる
    • しかしFlaskライクな軽量フレームワークで、シンプルで書きやすい
  • そこで、FastAPIを使うパターンでも前述の良い開発者体験を提供する仕組みを作った

サンプルコードはこちらにおいてあります。

github.com

前提

awsgiはAPIGatewayからLambda Proxy Integrationに渡されるイベントをWSGI*1アプリケーションが理解できる形式に変換し、またWSGIアプリケーションが返すレスポンスをLambda Proxy Integrationに従った形式に変換してくれる便利なツールです。

github.com

WSGIの精神的後継として*2、asyncに対応したASGIが登場しました。そのASGIに対応したフレームワーク(FastAPIなど)とLambda Proxy Integrationの間でアダプタとして動いてくれるのが、mangumです。

github.com

ざくっとまとめると以下のようになります。

WSGI ASGI
フレームワーク例 Flask, Django*3 FastAPI,Sanic,Responder
Lambda Proxyアダプタ awsgi mangum
(HTTP Server)*4*5 gunicorn, uWSGIほか uvicorn

それでは早速やってみましょう。

やってみる

今回用意したサンプルコードはdocker-composeで動くようにしてあるので、下記のリポジトリをクローンしたら下記の要領で起動してみて下さい。

$ docker-compose up -d
$ docker-compose exec exam_results python -c 'from main import init_ddb_local; init_ddb_local()'

実際のコードを見ていきましょう。

# exam_results/main.py

(省略)

from fastapi import FastAPI, HTTPException
from mangum import Mangum


app = FastAPI()  # FastAPIのインスタンス

(省略)

handler = Mangum(app, False)  # FastAPIのインスタンスをMangumのコンストラクタに渡して、handlerとして外から読めるようにしておく

(省略)

exam_results/main.py のに app という変数名でASGI準拠のアプリケーション(=FastAPIのインスタンス)を作り、それを引数として作ったMangumのインスタンスを handler として定義します。

これを serverless.yml の中でhandlerとして指定することで、APIGatewayから発生したイベントをMangumが受け取り、FastAPIに渡してくれるというわけです。

# serverless.yml
(省略)
functions:
  exam_results:
    events:
    - http:
        path: /{path+}
        method: GET
        private: false
        cors: true
    handler: exam_results.main.handler  # ←ココ
    environment:
      PYTHONPATH: exam_results
      DDB_TABLE: ${self:custom.envMapping.${self:provider.stage}.DDB_TABLE}
    role: ManageStudentsRole
(省略)

細かなimportや環境変数設定は実際のサンプルコードを参照してください。

起動時の二行目のコマンドで、下記の関数を呼び出しています。

def init_ddb_local():
    if ExamResultsTable.Meta.host and not ExamResultsTable.exists():
        print('creating a table...')
        ExamResultsTable.create_table(
            read_capacity_units=1,
            write_capacity_units=1,
            wait=True
        )
        print('Done.')

最初のif文でテーブルのモデル(ExamResultsTable)の内部クラスMetaのなかで、「環境変数から DDB_HOST が指定されている」かつ「テーブルがまだ存在しない」場合のみPynamoDBを使って create_table しています。

実際にリクエストをしてみましょう。

$ curl -X POST -H "Content-Type: application/json" -d '{"name":"Alice","subject":"math","score":50}' localhost:8001/scores
{"name":"Alice","subject":"math","score":50}
$ curl -X POST -H "Content-Type: application/json" -d '{"name":"Bob","subject":"history","score":49}' localhost:8001/scores
{"name":"Bob","subject":"history","score":49}

$ curl localhost:8001/scores?student=Alice
{"exam_results":[{"name":"Alice","subject":"math","score":50}]}
$ curl localhost:8001/scores?subject=history
{"exam_results":[{"name":"Bob","subject":"history","score":49}]}

次に、このアプリケーションをAWSにデプロイしてみます。(sls コマンドのインストールやAWS操作のための権限は先に済ませておいてください。)

$ sls deploy --stage=dev
Serverless: Generating requirements.txt from pyproject.toml...
(いっぱいメッセージが出る)
Service Information
service: fantastic-service-with-fastapi
stage: dev
region: ap-northeast-1
stack: fantastic-service-with-fastapi-dev
resources: 13
api keys:
  None
endpoints:
  GET - https://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/{path+}
  POST - https://xxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/{path+}
functions:
  exam_results: fantastic-service-with-fastapi-dev-exam_results
layers:
  None
Serverless: Removing old service artifacts from S3...
Serverless: Run the "serverless" command to setup monitoring, troubleshooting and testing.

endpoints にあるURLに対して、localhost:8001 に対して行ったのと同じ操作ができると思います。

あとは sls deploy --stage=devdevstg prd に書き換えて実行すれば、それぞれ独立した環境を作成することができます。 コマンドがシンプルなのでCI/CDも簡単に書くことができるでしょう。

また、/docs にアクセスするとOpenAPIのドキュメントが自動生成されています。

f:id:TatchNicolas:20200323212131p:plain
自動生成されたdocs

何が嬉しいのか

開発スピードを高く保てる

早く上手に使えば運用やデプロイの負担が大幅に軽減されるLambdaと、ローカルでもガンガン開発しやすいWebアプリケーションフレームワークのいいとこ取りをすることができます。また、FastAPIの提供する「Web APIを作る」ために便利な機能を活用することができます。

Lambdaを手元で動かしたいという点で、本記事と似たことを実現しようとしているSAM Localは「Lambdaっぽいなにかを手元で動かす」発想です。一方で今回のやり方は「ローカルで動いているアプリケーションをASGI(FlaskであればWSGI)をインターフェイスとしてLambdaに載せる」という方法です。そのため、serverless.yaml 内ではhttpイベントのパスを{path+} 、 メソッドをANYにしておくことでL7の仕事をAPIGatewayではなくLambda内のアプリケーションに任せています。

awsgiもmangumも(gunicornやuvicornといった)HTTP Serverを起動するわけではなく、単にイベントを変換しているだけなのでオーバヘッドもほとんどゼロなため、Lambdaがキチンとスケールしてくれればパフォーマンスの心配はありません。

いつでも実行環境を載せ替えられる

たとえば運用しつつ何らかの事情で「Lambdaじゃキツイな」となってきたらECSやk8sなどコンテナベースの実行環境に載せかえることも容易にできます。その際にアプリケーションは変更する必要がありません。ローカルで動かしていたDockerイメージをそのまま、またはDistrolessなどにちょっと書き換えるだけで引っ越すことができます。

今回は最小限の構成にするために、Pythonのコードはすべてexam_results/main.pyに詰め込みました。しかし実際のアプリケーションではもっと真面目にファイル・ディレクトリの構成を切り分けると思います。その場合はLambda用(exam_results/lambda.py)とDocker用(exam_results/docker.py)に入口をそもそも分けてしまったり、Poetryのextrasを使って依存ライブラリ管理を分けたりしても良いでしょう。

ハマったところ/改善したいところ

serverless-python-requirementsのモジュールとPoetryがうまく連携しない

パッケージ管理およびvenvの管理にはPoetryを使っていたのですが、Serverless Framework側でモジュール機能を使うと、プラグインが期待通りに動作せず*6にpackageに依存ライブラリを含めてくれません。

PoetryだけではなくPipenvでも同じ問題があり、こちらはIssue報告されています。(執筆時点で未解決)

github.com

Pipfileやpyproject.tomlでなくrequirements.txtであれば individually: true のオプションがちゃんと効いてLambda関数単位で依存関係を解決してくれるようです。なので、CI/CDのタイミングでコマンドを一行足してモジュールごとにPoetryにrequirements.txtを生成させば解決します。

ただし今回のサンプルコードでは、1スタック1関数として作ることで回避しました。*7

DynamoDB LocalとPynamoDBでスキーマ定義が二重管理になってしまう

宣言的に書くためにDynamoDBのスキーマ定義はserverless.ymlに書きたいところですが、それをローカル開発時のDynamoDBにそのまま適用することはできません。

今回アプリケーションはPynamoDBを使ってDynamoDBとやりとりをしており、PynamoDBのモデルはserverless.ymlで定義したスキーマと一致するように書いています。そこで、簡単に初期化関数を定義してPynamodDBの機能を使ってローカルのテーブルを作りました。

しかしこれは実際のアプリケーションには必要ないある種「余計な」コードがソースの中に紛れ込んでしまっています。serverless.ymlに書いた定義をそのままローカル開発環境にも適用できればよいのですが、スマートな方法があれば改善したいです。*8

さいごに

サーバレスはその恩恵と引き換えに、どうしてもプラットフォームの制約を受け入れる必要が生じます。そんな制約の一つが「イベントという形で入力を渡されるので、Lambdaとしての書き方を強制されて慣れ親しんだフレームワークとは勝手が異なる」ことだと思います。

しかしWSGI/ASGIというインターフェイス(とそれらを繋いでくれるmangumのようなツール)を活用することで、ローカルで作業のしやすいコンテナベースの開発とサーバレスな開発をシームレスに切り替えられます。JX通信社のプロダクトではECS+RDSで一部補助的にLambdaを使う構成が定番でしたが、最近はサーバレスを積極的に活用する場面も増えてきました。

今回はそんな社内の技術スタックの移り変わりにあわせた開発ができるように工夫してみたことの記録でした。少しでも皆さんの参考になれば幸いです。

*1:Pythonの標準仕様として用意されているWebアプリケーションのインターフェース定義で、Ruby屋さんにとってのRackにあたります

*2:公式Docより: ASGI (Asynchronous Server Gateway Interface) is a spiritual successor to WSGI, intended to provide a standard interface between async-capable Python web servers, frameworks, and applications.

*3:Django 3.0からASGIに対応していく方針ですが、現時点で限定的です。詳しくは https://docs.djangoproject.com/ja/3.0/releases/3.0/#asgi-support

*4:Ruby屋さんにとってのunicorn

*5:今回はHTTP Serverは使わずに素のWSGI/ASGIアプリケーションとして動かして、mangumにブリッジしてもらいます

*6:プラグイン側のコードをみたところservicePathにプロジェクトルートが渡されてしまいpyproject.tomlを見つけられないことが原因のようでした。lib/pip.jsではmodulePathという変数にモジュールのディレクトリ名が入ってくるのでうまく連結できれば動かせたかもしれません

*7:厳密にマイクロサービスするなら永続化層も共有すべきではないという考え方もあるので、1つのスタックに1つのLambda関数を良しとしました

*8:たとえばserverless.ymlをパースして、テーブルの有無をチェックして必要に応じて作成するなどできると思います

Goの並行処理について

はじめに

こんにちは。サーバーサイドエンジニアインターンでお世話になっている杉山と申します。 今回はオライリー・ジャパン社より出版されている『Go言語による並行処理』を読み勉強したことについて書いていきたいと思います。

Goにおける並行処理

GoはCPUのコア数が複数だった場合並列処理になります。 もし1コアのマシンで並列処理を行う場合それは並列処理ではなく素早く順に実行しているだけのようです。

並列処理と並行処理の違い

並列処理と並行処理の違いについては『Go言語による並行処理』では 『並列性はコードの性質を指し、並列性は動作しているプログラムの性質を指します』 とありますが分かりづらいので調べてみると

  • 並列処理 複数の命令の流れを同時に実行すること

  • 並行処理 コンピュータの単一の処理装置を複数の命令の流れで共有し、同時に実行状態に置くこと

このような違いがあるみたいです。イメージで説明すると以下のようになります

f:id:sugi1208:20200323174721p:plain

並列処理で気をつけること

デッドロック

  • プロセスなどの処理が互いの処理終了を待ち、どの処理も先に進めなくなってしまうこと

ゴールーチンで同じ変数へのデータ競合

  • 解決方法
    1つの変数には1つのゴールーチンからアクセスする
    ロックをとる
    チャネルを使う

チャネル型とは

チャネルはゴールーチン間でのメッセージを共有するためのもの

  • 特徴
    送受信時にブロックできる
    送信時にチャネルのバッファが一杯だとブロックする
    受信時にチャネル内が空だとブロックする

並列処理を使って大量のデータを書き込んでみる

今回は東京の気温データ(8785件)をMySqlに書き込んでいます。 また今回のコードに関しましては並列処理の例ということでfor文の中にてinsertしております。ご了承ください。

試しに自分で書いてみるコードがこちらです。

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "sync"

    "github.com/jinzhu/gorm"
    _ "github.com/jinzhu/gorm/dialects/mysql"
)

type Weather struct {
    Time    string `json:"time"`
    Temperature string `json:"temperature"`
}

func main() {

    client, _ := gorm.Open("mysql", "root:root@tcp([mysql]:3306)/weather?charset=utf8mb4&parseTime=true")

    client.AutoMigrate(&Weather{})

    ch := make(chan int, 4)
    wg := sync.WaitGroup{}

    account_json, err := ioutil.ReadFile("data.json")
    if err != nil {
        fmt.Println(err.Error())
    }
    var data []Weather
    if err := json.Unmarshal(account_json, &data); err != nil {
        log.Fatal(err)
    }
    for _, name := range data {
        ch <- 1
        wg.Add(1)
        go func(name Weather, client *gorm.DB) {
            defer wg.Done()
            client.Create(&name)
            <-ch
        }(name, client)
    }
    wg.Wait()
    client.Close()

}

変数dataに気象データを入れてfor文を使用し書き込みの処理をしています。

上記のコードの良くないと思う点

  • ただforで回して書き込むより少し早くしただけのコードになって安定したコードではない

  • 上記のコードには該当しないがデッドロックが起きた場合を考慮していない

  • insert処理が途中で止まってしまった場合処理が終了しない

上記の点を改善していきます

本を読んで書いてあったことを試してみる

次に本で読んでみて学んだ事を踏まえてコードを書いてみました。
今回はハートビートと言われる並列処理で用いられる手法を使用します。

ハートビートとは

ハートビートとはその名の通り人間の心拍のような物です。並列処理内部で外部にそのプロセスが生きているかを伝える方法です。

ハートビート使用した場合のメリット

今回の場合ハートビートは一定時間毎に外部にプロセスが生きているか伝えています。 ハートビートを使用することによって並列処理内部にて何しらの処理が止まってしまい外部に一定時間以上外部にプロセスが生きているか伝えられていない場合や、デッドロックしてしまった場合に処理を終了させることなどができます。

今回書いた全体のコードはこちらになります

こちらでは一部抜粋して説明していきます。

func main(){

    ----------省略----------

    done := make(chan interface{})

    defer close(done)

    hertbeat, results, errch := insert(done, data, client)

    <-hertbeat   //最初のハートビートがゴルーチンに入ったことを受け取ります

    i := 0

    for {  // ここはhertbeatの受け取りなどの処理
        select {
        case r, ok := <-results: //ここでは書き込んだ内容と書き込むはずの内容の比較をしています
            if ok == false {
                return
            } else if Weather := data[i]; Weather != r {
                log.Fatal("MismatchingError", Weather)
            }
            i++
        case <-errch:  
            log.Fatal(errch)
        case <-hertbeat:  //一定時間ごとにハートビートを受け取ります。
        case <-time.After(5 * time.Second):    //ここでは上記の処理が一定時間行われない場合にタイムアウトするようにいています。
            log.Fatal("Timeout")
        }
    }

}

func insert(done <-chan interface{}, data []Weather, client *gorm.DB) (<-chan interface{}, <-chan Weather, <-chan error) {
    hertbeat := make(chan interface{}, 1) 
    weatherch := make(chan Weather)
    errch := make(chan error)

    go func() {
        defer close(hertbeat)
        defer close(weatherch)

        beat := time.Tick(time.Second)

    Loop:
        for _, name := range data {
            for {

                select {
                case <-done:
                    return
                case <-beat:
                    select {
                    case hertbeat <- struct{}{}:  //ハートビートを一定時間ごとに送っています  
                default:
                    }
                case weatherch <- name:
                    client := client.Create(&name) //ここではDBに対する書き込みです
                    if client.Error != nil {
                        errch <- client.Error // ここではエラーハンドリングをしています。そのまま表示させてもよかったのですが今回はこのような形で書きました
                    }

                    continue Loop
                }
            }
        }
    }()

    return hertbeat, weatherch, errch    //   hertbeat, weatherch, errchのchanを返しています
}

メインの処理はinsert関数の中ではhertbeat等のチャネルを作成しinsertの処理をgo func() を使用し、main関数の下のforにてheatbeatを受け取る等の処理と並列に実行しています。

またコメントにもありますがheatbeatが一定時間受け取れていない場合selectの最後にタイムアウトするような処理を書くことによって処理を中断させることができます。

並列処理内部にて処理の停止、もしくはデッドロックが起きた場合の処理の比較

ここでは自分が書いたコードとハートビートを使用したコードが並列処理内部にて処理が止まってしまった場合にとのような動きをするかを比較していきます。

ここではデットロックや内部の処理の停止の再現としてtime.Sleep()を使用し60秒程の時間、処理を停止させます。

自分の書いたコードの場合

func main(){

    ----------省略----------
    for _, name := range data {
        ch <- 1
        wg.Add(1)
        go func(name Hoge, client *gorm.DB) {
            defer wg.Done()
            time.Sleep(60 * time.Second)//  この部分を追加し、処理を60秒停止させています
            client.Create(&name)
            <-ch
        }(name, client)
    }
    
    ----------省略----------

今回は60秒停止ということにしてありますがもし完全に停止してしまった場合(デッドロック等)、処理は終わらずにタイムアウトすることもありません。

ハートビートを使用したコードの場合

func main(){    
    ----------省略----------
    case weatherch <- name:
    time.Sleep(60 * time.Second)//  この部分追加し、処理を60秒停止させています
    client := client.Create(&name)
    if client.Error != nil {
        errch <- client.Error  
    }
  ----------省略----------

ハートビートが5秒間外部に送信されない場合以下のようにタイムアウトさせることができます。

2020/03/23 07:57:20 Timeout
exit status 1

このように並列処理内部にて何かしらのトラブルがあり、処理が停止した場合ハートビートを使用することによって処理を中断することができます。

最後に

Goの並列処理は少ししか触れていなかったので今回勉強して並行処理について深く知れました。『Go言語による並行処理』ではGoに限らず並行処理自体の勉強にもなるので難しいですがとても勉強になる本でした。まだ理解が浅い部分もあるので繰り返し読み勉強していきたいと思います。またこの場をお借りして勉強の機会を下さった会社の方々にお礼を申し上げたいと思います。ありがとうございました。

JX通信社の取り組みとメンバーの優しさがデブサミの登壇を支えてくれたという話

先月に引き続きデブサミの話で失礼します.

JX通信社でシニア・エンジニアをしています, @shinyorke(しんよーく)と申します. 最近引っ越した新オフィスの近くにあるビャンビャン麺がマイブームです.

前回のエントリーでもご紹介させてもらいましたが, 先日開催されました「Developers Summit 2020」にて, 自分のキャリアを基にした未来の話をさせていただきました.

f:id:shinyorke:20200218111119j:plain
満員御礼でした(感謝)

満員御礼いや...立ち見の方も出たぐらいに大盛況で感謝しております.*1

来ていただいた皆さま誠にありがとうございました!

どういった発表をして, どんな準備をしていたのか?についてはスライドブログをご覧いただくとして.*2

こちらのエントリーでは,

デブサミ(と後日登壇のスポアナ)の発表は「JX通信社」の取り組みとサポート無しではできなかったんですよ!

という大事な話を,

  • JX通信社のメンバーが活用できる制度
  • エンジニアチーム独自の取り組み
  • 優しさあふれる周りのサポート

といった制度・文化の側面でご紹介いたします.

なお, 今回はTechな話題がほぼないエントリーとなります :bow:

TL;DR

  • イベント・勉強会参加は業務時間扱いなので気楽に参加できますよ!
  • さらに土日・祝日に登壇をした場合, しっかり代休もらえます
  • プロジェクト管理(Jira)を使わせてもらったり, 発表練習・Slackなどを通じて会社・メンバーから優しいフォローいただきました(感謝)

おしながき

勉強会・イベントに関するサポートについて

JX通信社では全社的な制度として,

勤務時間中の勉強会等への参加自由・参加費補助

知識・スキルの向上のため、勤務時間中に社外で催される勉強会等に参加したい場合は、自由に参加することができます。また、勉強会の参加費は全額を経費として処理できます(会社負担)。

※引用元: JX通信社HP「採用情報」

があります.

デブサミは平日の日中に行われるイベントではありますが, 私はこちらの制度のおかげで登壇した2日目はフル参加, 初日は午後半日参加できました.*3

複数人が応援に来てくれた(感謝)

この制度を活用して自分が登壇した当日は4名のメンバーが応援に駆けつけてくれました.

f:id:shinyorke:20200218100520j:plain
撮影とか色々と手伝ってもらいました.

こんな感じで沢山写真を撮ってもらったり,

  • 他の発表を聞いたり, ブースを回った感想を共有してもらったり
  • 様子をSlackに呟いて社内のメンバーに共有したり*4

などなど, 私一人ではとても手が回らない所を助けてもらったりしました.

土日・祝日に登壇すると代休を取れる

さらに最近,

土日・祝日に登壇を行うとイベント時間に応じて代休取得可能

という制度もできました(条件を満たした場合に限る)*5.

実はデブサミに登壇した2/14から中一日で「Sports Analyst Meetup(スポアナ)」というイベントでロングトークを2/16にさせていただいたのですが, こちらのトークが日曜日にあったこと, お休みの条件も満たしていたため翌日はしっかり休んで疲れを取ることができました.

準備段階でのサポート

また, デブサミの登壇前は以下のサポートをいただきました.

  • プロジェクト管理ツール「Jira」の利用許可
  • 社内勉強会やSlackチャンネルを通じたサポート

Jiraでデブサミのタスクを管理

JX通信社ではJiraをプロジェクト管理ツールとして使っています.

「デブサミの管理でも使いたいなあ」と相談した所, 二つ返事でOKもらったので準備段階からフル活用させてもらいました.

f:id:shinyorke:20200218103135p:plain
スプリント単位でタスクを回していました

Jiraにタスク管理を集中できたので本番まで大きなトラブルもなくできました.

また, こうしてメトリクスやレポートが残っていくのでまだJiraを活用しきれていないチームやメンバーに向けてのサンプルとして活用もできそうとも思いました.

利用させてもらったぶん, 知見をこっちから発信していくぞ!

月次勉強会・Slackを通じたメンバーのサポート

これは前回のエントリーでもちょっと紹介させてもらった話でもあります.

tech.jxpress.net

デブサミ本番で披露したデモアプリケーションは, 月次の社内勉強会で共有したものをほぼそのまま披露できました.

開発・企画段階でSlackに相談やネタを披露すると光の速さでレスが来たり, コードレビューするよ!と手を上げてくれたりとノウハウ的・心理的にも色々と助けてもらいました.

また, 月次の勉強会でのフィードバックも多くこれらがあったことにより本番も安心して望むことができました.

もっとアウトプットを出していくぞ

というわけでこのエントリーでは「JX通信社がアウトプットを出す際のサポートや制度」といった文脈を紹介させてもらいました.

これらの制度を活用し, もっとアウトプットを出すことによって成長に繋がるような流れが最近できつつあるので今後のJX通信社およびこのブログにもご期待ください.

最後までお付き合いいただきありがとうございました.

*1:正式な入場者数の数字は把握していませんが, 満席率100%超えだったのは確かです.

*2:なので発表内容そのものやそれに付随するエピソードについては割愛いたします. リンク先のスライドもしくはブログをご覧ください(手前味噌).

*3:初日が半日だったのは, 自分の作業の都合で自ら選んだもので, やろうと思えばフル参加できました

*4:その他, 一部のメンバーはツイッターでも呟いてもらって盛り上げに貢献してもらいました(圧倒的感謝)

*5:具体的には会社およびサービスの認知に繋がるような紹介が必要となります