Istio VirtualServiceのHost衝突を検知するAdmission Webhookをつくってみる

JX通信社Advent Calendar 2019」15日目の記事です。昨日はペイさんによるNuxt.js + firebaseで「積ん読防止」アプリを作ってみたでした。

こんにちは、SREのたっち(TatchNicolas)です。

はじめに

引き続き、KubernetesのAdmission Webhooksについて書きます。Admission Webhooksとは何か、簡単に作って動かしてみる方法については前回記事も参照してください。

tech.jxpress.net

今回はもう少し踏み込んで、ちょっとだけ役に立ちそうなAdmission Webhookを書いてみたいと思います。

TL; DR

  • Istio VirtualServiceのHostの衝突を防止するValidating Webhookを作った
  • 似たようなロジックでk8s本体のingressなどにも使えるはず

前回までのあらすじ

Kubernetesのリソース操作は、kube-apiserverにリクエストを送ることで行います。その操作がetcdに反映される前にそのリクエスト内容をValidate/MutateできるのがAdmission controllerで、Kubernetesにbuilt-inしなくても自前のValidate/Mutateの処理を足せるのがAdmission Webhooksです。

前回はWebhook作成に必要なもの(Webhookを動かすDeployment、Serviceリソース、証明書など)を作る手順をゼロから実施し、サンプルとして metadata.labels の規約が守られているかチェックしたり、 metadata.name に接頭辞を自動で付けたりする処理を実装しました。

今回やってみたこと

IstioのVirtualServiceを定義するときに、ドキュメントにもあるように、同じHostを複数のVirtualServiceに分割して書くやり方があります。

しかし、内向けのAPIなどの定義にパスではなくホスト名でマイクロサービスを定義している場合にはhostの衝突は避けたいと思います。

たとえば、 api.some-product というホスト名がすでに存在しているときに別のチームで *.some-product のようなホスト名が登録されてしまうと、複数のVirtualServiceリソース間での評価順序は保証されないため、リクエストが吸い取られて意図しない挙動をしてしまうかもしれません。

そこで、マニフェスト適用時に衝突を検出できるようなValidating Webhookを作ってみました。

サンプルコードは以下になります。

github.com

やってみる

Prerequisites

  • Istioが動いているクラスタ*1
  • 公式ドキュメント*2か前回の記事を参考に、証明書などWebhook開発のための準備ができていること
  • Webhookを動かしているPodにkube-apiserverを叩かせるための権限を与えていること*3

Webhookのリソース定義

webhooks.rules 以下にValidation対象のリソース種別を書いていきます。

apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: "istio-vsvc-host"
webhooks:
- name: "istio-vsvc-host.hoge.fuga.local"
  failurePolicy: Fail
  rules:
  - apiGroups: ["networking.istio.io"]
    operations: ["CREATE","UPDATE"]
    apiVersions: ["v1alpha3"]
    resources: ["virtualservices"]
    scope: "Namespaced"
  clientConfig:
    caBundle: <server.crtの中身をbase64エンコードして貼る>
    service:
      namespace: default
      name: istio-vsvc-host
      path: /validate
  admissionReviewVersions: ["v1", "v1beta1"]
  timeoutSeconds: 5
  sideEffects: None

Webhookの実装

Webhookもいたってシンプルです。

import fnmatch

from flask import Flask, jsonify, request
from kubernetes import client, config


app = Flask(__name__)

config.load_incluster_config()
# telepresenceではこっちを使う
# config.load_kube_config()

conf=client.Configuration()
api_client=client.ApiClient(configuration=conf)

@app.route('/validate', methods=['POST'])
def validate():
    try:
        # リクエストから必要な情報を抜き出す
        req = request.get_json()
        new_hosts = req['request']['object']['spec']['hosts']
        apiserver_resp = api_client.call_api(
            '/apis/networking.istio.io/v1alpha3/namespaces/default/virtualservices',
            'GET',
            auth_settings=['BearerToken'],
            response_type='object',
        )
        nested_existing_hosts = {
            tuple(item['spec']['hosts']) for item in apiserver_resp[0]['items']
        }
        existing_hosts = {item for sublist in nested_existing_hosts for item in sublist}

        print(f'existing_hosts: {existing_hosts}')
        print(f'new_hosts: {new_hosts}')

        # UPDATEのときは、oldに入っているものは検査対象から除外する
        operation = req['request']['operation']
        if operation == 'UPDATE':
            old_hosts = req['request']['oldObject']['spec']['hosts']
            for host in old_hosts:
                existing_hosts.remove(host)
            print(f'updated existing_hosts: {existing_hosts}')

        # hostsの被りがないかチェックする
        pair = get_collision_pair(new_hosts, existing_hosts)

        if pair:
            allowed = False
            message = f'{pair[0]} collides with {pair[1]} which already exists'
        else:
            allowed = True
            message = f'No collision detected'


        # 結果を返す
        return jsonify({
            'apiVersion': 'admission.k8s.io/v1',
            'kind': 'AdmissionReview',
            'response': {
                'uid': request.get_json()['request']['uid'],
                'allowed': allowed,
                'status': {'message': message}
            }
        }), 200

    except (TypeError, KeyError):
        return jsonify({'message': 'Invalid request'}), 400

def get_collision_pair(new_hosts, existing_hosts):
    for new_host in new_hosts:
        for existing_host in existing_hosts:
            if fnmatch.fnmatch(new_host, existing_host) or fnmatch.fnmatch(existing_host, new_host):
                return new_host, existing_host

動かしてみる

以下のようなVirtualServiceが既に存在する状態で、新たにVirtualServiceのhostを増やしてみます。

$ kubectl get virtualservices
NAME              GATEWAYS   HOSTS                     AGE
existing-vsvc-1              [hoge.tatchnicolas.com]   15s
existing-vsvc-2              [fuga.tatchnicolas.com]   15s

まずはぶつからない場合。

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: new-vsvc
spec:
  hosts:
  - 'new.tatchnicolas.com'
  http:
  - name: "ClusterIP"
    route:
    - destination:
        host: new-serivce.default.svc.cluster.local

これはapplyしても普通に成功します。

$ kubectl apply -f istio-vsvc-host/new_vsvc.yaml
virtualservice.networking.istio.io/new-vsvc created

では、既存のhostと衝突させてみましょう。

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: new-vsvc
spec:
  hosts:
  - 'new.tatchnicolas.com'
  - 'fuga.tatchnicolas.com'  # これがぶつかる
  http:
  - name: "ClusterIP"
    route:
    - destination:
        host: new-serivce.default.svc.cluster.local

衝突を検知し、きちんと拒否していることがわかります。

$ kubectl apply -f istio-vsvc-host/new_vsvc.yaml
(中略)
for: "istio-vsvc-host/new_vsvc.yaml": admission webhook "istio-vsvc-host.hoge.fuga.local" denied the request: fuga.tatchnicolas.com collides with fuga.tatchnicolas.com which already exists

ワイルドカードにも対応できます。

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: new-vsvc
spec:
  hosts:
  - '*.tatchnicolas.com'
  http:
  - name: "ClusterIP"
    route:
    - destination:
        host: new-serivce.default.svc.cluster.local

こちらも拒否できていますね。

$ kubectl apply -f istio-vsvc-host/new_vsvc.yaml
(中略)
for: "istio-vsvc-host/new_vsvc.yaml": admission webhook "istio-vsvc-host.hoge.fuga.local" denied the request: *.tatchnicolas.com collides with fuga.tatchnicolas.com which already exists

さいごに

リクエストを適切に処理できれば実装言語は何でも良いので、前回のコードをベースにそのままPythonで書きました。比較のロジックもかなり簡単に書いたのでパフォーマンス面で改善の余地がありそうですし、実用レベルに持っていくにはNamespace対応したりテストもしっかり書かないといけないでしょう。

それでも、KubernetesやIstioに手を加えずに、ちょっとしたチェック/書き換え機能を追加できるのは非常に便利です。

理想的にはもっと汎用的にValidation条件を渡せるようにして、カスタムリソースとして簡単に適用できるようにするとか応用の幅は広そうです。

プロトタイプとテストをサクッと作れる言語で書いて、あとからリファクタリングしたり別の言語に書き換えることもできるので、ユースケースに合わせて上手に使っていきたいです。

*1:Minikubeのデフォルトではリソースが足りないせいでIstioのコントロールプレーンが起動せず、また手元の環境(i7/16GB RAM)ではIstio推奨のリソースを動かすのは辛かったです

*2:https://kubernetes.io/docs/tasks/tls/managing-tls-in-a-cluster/

*3:https://github.com/TatchNicolas/sample-admission-webhook/blob/master/istio-vsvc-host/rbac.yaml

ライフログを可視化してみたら偏食のようすがわかった - 飯田橋ランチマップ

JX通信社Advent Calendar 2019」11日目の記事です.

昨日は, @shinyoke さんの「PySparkはじめました - 分散処理デビューする前にやったこと」でした。 こんにちは. 同じくJX通信社でデータ基盤エンジニアをしています, @maplerと申します。

はじめに

今回はちょっと美味しい話をします。 昼時間になったらよくある話

「今日昼飯どこにいきますか?」
「わからない。。」

JX通信社オフィスがある飯田橋周辺美味しい店たくさんありまして、どこでランチを食べればいいのかわからない。

ちょうど2年前、Moves App というライフログアプリを一年半ほど利用してたので、そのデータを利用して自分の飯田橋ランチマップを作ってみようと思います。

やったこと

  • GeoPandas と GeoPy で位置情報の解析
  • Mapbox + Plotly で位置情報の可視化
  • Scikit-Learn で位置データのクラスタリング
  • Google Place API で位置情報から地点を取得する

目次

Moves App とは

スマホのGPS位置情報を記録して、自分がどこに滞在してたか、その間の移動方法(徒歩、走る、自転車、車、電車など)をロギングするアプリです。(ちなみに、2018年7月末にサービス終了)

データを取得

Moves App がサービス終了時とともに、公式サイトから過去のトラックデータをダウンロードすることができます。行動データは JSON で格納されてます。 Moves のデータは四種類があります。

  • activites: 移動中の行動タイプ(電車、歩行、自転車など)、カロリー、距離、時間などの情報。(Line data)
  • places: 行動中の滞在地点、滞在時間などの情報。(Point data)
  • storyline: activites と places を含め、さらに移動ルートの情報もあります。
  • summary: 一日単位の行動、距離、カロリーなどのまとめ情報

今回はランチの店だけ絞りたいので、places 情報のみ扱うことにします。

places データ

まずどんなデータかを見ましょう:

// ./json/yearly/places/places_2017.json
[
  ...
  {
    "date": "20170327",
    "segments": [
      {
        "type": "place",
        "startTime": "20170326T201246+0900",
        "endTime": "20170327T095023+0900",
        "place": {
          "id": 123456,
          "name": "飯田橋駅",
          "type": "home",
          "location": {  // 飯田橋駅
            "lat": 35.702084,
            "lon": 139.745023
          }
        },
        "lastUpdate": "20170327T014641Z"
      }
  ...
    ],
    "lastUpdate": "20170328T020321Z"
  },
  {
    "date": "20170328",
    "segments": [
      ...
    ]
    ...
  }
    ...
]

日別に segments という項目に時間順でその日行った場所が格納されている。

segment の構造

  • type: activity データでは値は place or move 2種類ありますが、places データには place のみになります
  • startTime: その地点に着いた時間
  • endTime: その地点から離れた時間
  • place: 地点の情報
    • id: 地点 id
    • name: 地点名。アプリが自動判定してくれますが、手動で地点名修正ができます。自動判定と手動修正してない場合、不明で null になる
    • type: アプリに定義したカテゴリ?
      • home: 家
      • work: 仕事場
      • facebook: 地点名推測情報源?
      • facebookPlaceId: facebook の地点 Id
      • user: 手動で地点名を修正した場合
    • location: 経度緯度の Geo 情報
  • lastUpdate: segment の最後更新時間

太字でマークした項目は今回扱うデータになります

データをロード

Pandas でデータをロード

import pandas as pd
place_df = pd.read_json('./moves/moves_export/json/yearly/places/places_2017.json', orient="records")
place_df = pd.concat([place_df, pd.read_json('./moves/moves_export/json/yearly/places/places_2018.json', orient="records")])
place_df.head()
date segments lastUpdate
0 20170110 [{'type': 'place', 'startTime': '20170110T0104... 20170117T055308Z
1 20170111 [{'type': 'place', 'startTime': '20170110T2136... 20170112T022810Z
2 20170112 [{'type': 'place', 'startTime': '20170111T2035... 20170113T163025Z
3 20170113 [{'type': 'place', 'startTime': '20170112T2107... 20170114T000927Z
4 20170114 [{'type': 'place', 'startTime': '20170113T1939... 20170114T093316Z

直接 json ロードだと segments がうまく展開できない。

json_normalize を使ってリストのカラムを展開

json_normalize という便利な関数でカラムを展開 (flat) します。

import json
with open('./moves/moves_export/json/yearly/places/places_2017.json', 'r') as f:
  d_2017 = json.load(f)
  place_segment_df_2017 = json_normalize(d_2017, record_path='segments')
with open('./moves/moves_export/json/yearly/places/places_2018.json', 'r') as f:
  d_2018 = json.load(f)
  place_segment_df_2018 = json_normalize(d_2018, record_path='segments')
# concat two DataFrame
place_segment_df = pd.concat([place_segment_df_2017, place_segment_df_2018])

record_path を segments カラムを指定して展開した結果

place_segment_df.head()
type startTime endTime lastUpdate place.id place.name place.type place.location.lat place.location.lon place.facebookPlaceId
0 place 20170110T010439+0900 20170110T094051+0900 20170110T013658Z 396353386 home ******* ******* NaN
1 place 20170110T123747+0900 20170110T125939+0900 20170117T055308Z 396514412 イタリアン酒場P facebook 35.699011 139.748671 218228871551007
2 place 20170110T130209+0900 20170110T195846+0900 20170110T113633Z 396471967 JX通信社 work 35.700228 139.747905 NaN
3 place 20170110T202923+0900 20170110T203906+0900 20170110T130729Z 396353386 home ******* ******* NaN
4 place 20170110T204418+0900 20170110T212233+0900 20170110T160517Z 396651896 エニタイムズ user 35.704518 139.728673

※ 家の GEO情報を非表示させていただきます

データの全体像

>> place_segment_df.info()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 4542 entries, 0 to 1658
Data columns (total 10 columns):
type                     4542 non-null object
startTime                4542 non-null object
endTime                  4542 non-null object
lastUpdate               4542 non-null object
place.id                 4542 non-null int64
place.name               2255 non-null object
place.type               4542 non-null object
place.location.lat       4542 non-null float64
place.location.lon       4542 non-null float64
place.facebookPlaceId    477 non-null object
dtypes: float64(2), int64(1), object(7)
memory usage: 390.3+ KB

こうして 1.5 年分ぐらいの私が行った場所が一つの DataFrame にまとめます

データを絞る (前処理)

まずはこの 4542 行のデータの中から 平日 仕事場ランチ を絞りたい

前処理

不要なカラムを消す

place_segment_df = place_segment_df[['startTime', 'endTime', 'place.name', 'place.type', 'place.location.lat', 'place.location.lon']]

時間カラムを datetime object に変換

place_segment_df.startTime = pd.to_datetime(place_segment_df.startTime)
place_segment_df.endTime = pd.to_datetime(place_segment_df.endTime)

平日を絞る

weekday_place_df = place_segment_df[place_segment_df.apply(lambda row: row.startTime.weekday() < 5 and row.endTime.weekday() < 5, axis=1)]

ランチ時間を絞る

時間を絞る
# filter lunch time
from datetime import timedelta

LUNCH_START_FROM = 11  # earliest lunch start time
LUNCH_END_FROM = 15  # latest lunch end time
MAX_LUNCH_DURATION = 3 * 60  # max lunch time 3 hours
MIN_LUNCH_DURATION = 5  # max lunch time  5 minutes

def is_lunch_time(start_dt, end_dt):
    if end_dt - start_dt > timedelta(minutes=MAX_LUNCH_DURATION):
        # can not more than max lunch time
        return False
    elif end_dt - start_dt > timedelta(minutes=MAX_LUNCH_DURATION):
        # can not less than min lunch time
        return False
    else:
        # check startTime and endTime are all between lunch time
        return start_dt.hour > LUNCH_START_FROM and start_dt.hour < LUNCH_END_FROM and end_dt.hour > LUNCH_START_FROM and end_dt.hour < LUNCH_END_FROM

lunch_time_place_df = weekday_place_df[weekday_place_df.apply(lambda row: is_lunch_time(row.startTime, row.endTime), axis=1)]
時間を可視化
# group by hour
hour_group = pd.DataFrame({"hour": place_segment_df.startTime.apply(lambda x: x.hour)}).groupby('hour')['hour'].count()
lunch_hour_group = pd.DataFrame({"hour": lunch_time_place_df.startTime.apply(lambda x: x.hour)}).groupby('hour')['hour'].count()

# use plot.ly
from plotly.subplots import make_subplots
import plotly.graph_objects as go

fig = make_subplots(rows=1, cols=2, subplot_titles=('full time', 'lunch time'))
fig.add_trace(go.Bar(x=hour_group.index, y=hour_group),row=1, col=1)
fig.add_trace(go.Bar(x=lunch_hour_group.index, y=lunch_hour_group), row=1, col=2)
fig.show(showlegend=False)

f:id:maplerme:20191211035357p:plain

主に 13 時前後にお昼ごはんを食べていくことが多いようです。

仕事場を絞る

位置情報扱いやすいため geopygeopandas を使い

geopandas で DataFrame を GeoDataFrame に変換

place.location.latplace.location.lon を代入して GeoDataFrame に変換

import geopandas
gdf = geopandas.GeoDataFrame(lunch_time_place_df, geometry=geopandas.points_from_xy(lunch_time_place_df['place.location.lon'], lunch_time_place_df['place.location.lat']))

geometry というカラムが作られます

gdf.head()
startTime endTime place.name place.type place.location.lat place.location.lon geometry
1 2017-01-10 12:37:47+09:00 2017-01-10 12:59:39+09:00 イタリアン酒場P facebook 35.699011 139.748671 POINT (139.74867 35.69901)
10 2017-01-11 13:25:15+09:00 2017-01-11 13:45:35+09:00 つけ麺 つじ田 facebook 35.701441 139.746628 POINT (139.74663 35.70144)
17 2017-01-12 12:35:05+09:00 2017-01-12 13:11:11+09:00 かつ村 facebook 35.698636 139.753765 POINT (139.75376 35.69864)
24 2017-01-13 13:09:11+09:00 2017-01-13 13:26:29+09:00 ヤミツキカリー facebook 35.699180 139.744961 POINT (139.74496 35.69918)

↑ ちなみにランチ時間を絞りでいい感じにランチの地点を絞ってくれました

geopy で飯田橋周辺 3km を絞ります

geopy の distance モジュールに geodesic という測地学的な距離を計算関数があります。

from shapely.geometry import Point
from geopy.distance import geodesic

def _distance(point_a_y, point_a_x, point_b_y, point_b_x):
  return geodesic((point_a_y, point_a_x), (point_b_y, point_b_x)).meters

def distance_for_point(point_a, point_b):
  """
  geodesic distance of shapely.geometry.Point object
  """
  return geodesic((point_a.y, point_a.x), (point_b.y, point_b.x)).meters

# 3km from iidabashi station
DIAMETER = 3 * 1000
IIDABASHI_STATION_POINT = Point(139.745023, 35.702084)

def near_iidabashi(point):
  return distance_for_point(point, IIDABASHI_STATION_POINT) < DIAMETER

gdf['is_near_iidabashi'] = gdf.geometry.apply(near_iidabashi)

プライベート情報を絞る(小声 🤫)

家がオフィスに近いので仕事場絞っても家は範囲内に入ってます、たまにランチ帰宅することもあります、ここ place.type を利用して非表示させていただきます。

>> (gdf['place.name'] == 'home').value_counts()
False    501
True       4
Name: place.name, dtype: int64
>> gdf = gdf[gdf['place.type'] != 'home']

(4回帰宅してたようです🏠)

結果を可視化

今回もっともやりたかったことです!

可視化前に

便利上、滞在時間(分)の duration カラムを追加
# add duration (unit: minute) column
weekday_place_df['duration'] = weekday_place_df.apply(lambda row: (row.endTime - row.startTime).seconds//60, axis=1)
場所名を coding した name_id カラムを追加
gdf['name_id'] = gdf['place.name'].astype('category').cat.codes

Mapbox と Plotly を使って地図上可視化

Mapbox を利用するため、Mapbox の開発アカウントを登録した上、token を取得する必要があります。

詳細は Mapbox の公式ドキュメント を参照してください。

※ Mapbox は従量課金制で、Web 上の地図ロードは毎月 50,000 回まで無料です。詳細はこちら

店名(name_id)を色付けにして地図上に plot

import plotly.graph_objects as go

token = 'your mapbox token here'

fig = go.Figure(go.Scattermapbox(
    mode = "markers+text",
    lat=gdf.geometry.y, 
    lon=gdf.geometry.x, 
    text=gdf['place.name'].apply(lambda x: x if isinstance(x, str) else ''),
    textposition = "bottom center",
    hoverinfo='name+text',
    hovertext=gdf.apply(lambda row: f"{row['place.name']} ({row['duration']}分)", axis=1),
    marker=go.scattermapbox.Marker(
        color=gdf['name_id'], colorscale='RdBu',
        size=10, sizemode='area'),
))

fig.update_layout(
    hovermode='closest',
    mapbox=go.layout.Mapbox(
        accesstoken=token, bearing=0,
        center=go.layout.mapbox.Center(
            lat=IIDABASHI_STATION_POINT.y,
            lon=IIDABASHI_STATION_POINT.x
            ), pitch=0, zoom=15
        ),
      margin=dict(l=10, r=10, t=10, b=10)
)

fig.show()

f:id:maplerme:20191211165458p:plain

ランチマップがいい感じに出てきました。

(みずほ銀行も混在してたが、とりあえず無視)

ちなみに当時名前付けてなかった店がまだ多い

>> gdf['place.name'].isnull().value_counts()
False    279
True     146
Name: place.name, dtype: int64

無名地点は倍ぐらいあります。

無名地点の名前つけ

Geo 情報から Google Place API で名前付けを考えてますが、精度を上げるため、同じ場所多数の地点をまとめてから実行しようと思います。

Scikit-learn で Clustering

Scikit-learn に Clustering のアルゴリズムがたくさん用意してますが、今回はシンプルで距離で Clustering したいので、DBSCAN を利用します。

DBSCAN

DBSCAN (Density-based spatial clustering of applications with noise ) は、1996 年に Martin Ester, Hans-Peter Kriegel, Jörg Sander および Xiaowei Xu によって提案されたデータクラスタリングアルゴリズムである。[1]これは密度準拠クラスタリング(英語版)アルゴリズムである。ある空間に点集合が与えられたとき、互いに密接にきっちり詰まっている点をグループにまとめ(多くの隣接点を持つ点、en:Fixed-radius_near_neighbors)、低密度領域にある点(その最近接点が遠すぎる点)を外れ値とする。

from Wikipedia - DBSCAN

高密度なクラスタリングとていう点は今回の要件と合います。

dbscan

※ DBSCAN の例図(出典

距離関数を指定して cluster を作成

上で定義した geodesic 距離関数を DBSCAN の metric 関数 として利用

5メートル範囲内に cluster を作成

from sklearn.cluster import DBSCAN

def distance_for_matrix(point_a, point_b):
  """
  geodesic distance of matrix
  """
  return _distance(point_a[0], point_a[1], point_b[0], point_b[1])

coords = gdf.as_matrix(columns=['place.location.lat', 'place.location.lon'])  # transform lat, lon to array
MIN_DISTANCE = 5  # cluster min distance as 5 meters
db = DBSCAN(
    eps=MIN_DISTANCE,  # min distance
    min_samples=1,  # min cluster samples, cluster will be -1 if under min_samples
    metric=distance_for_matrix
).fit(coords)

GeoDataFrame に代入

>> gdf['cluster'] = db.labels_
>> gdf['cluster'].value_counts()
6     50
4     29
24    27
5     27
12    25
      ..
40     1
38     1
37     1
36     1
39     1
Name: cluster, Length: 80, dtype: int64

39 クラスタ(地点)をまとまりました!

Cluster の情報を group by でまとめる
place_group = gdf.groupby('cluster')
grouped_df = pd.DataFrame({
  'place_names': place_group['place.name'].apply(lambda x: list(set([x for x in x.tolist() if isinstance(x, str)]))),  # list of place name
  'lat_mean': place_group['place.location.lat'].mean(),  # mean of latitude
  'lon_mean': place_group['place.location.lon'].mean(),  # mean of longitude
  'duration': place_group['duration'].mean(),  # mean of duration
  'times': place_group['cluster'].count(),  # visit times
})
  • place_name: アプリで判定した地点名を set unique してリスト化
  • lat_mean: 緯度の平均値
  • lon_mean: 経度の平均値
  • duration: 滞在時間の平均値
  • times: 訪問回数
テーブル整形
# reshape dataframe
grouped_df.reset_index(inplace = True)

もう一回地図上で plot

group を色付けして

fig = go.Figure(go.Scattermapbox(
    mode = "markers+text",
    lat=gdf["place.location.lat"], 
    lon=gdf["place.location.lon"], 
    text=gdf['place.name'].apply(lambda x: x if isinstance(x, str) else ''),
    textposition = "bottom center",
    hoverinfo='name+text',
    hovertext=gdf.apply(lambda row: f"{row['place.name']} ({row['duration']}分, {row['cluster']})", axis=1),
    marker=go.scattermapbox.Marker(
        color=gdf['cluster'], colorscale='RdBu',  # colored by cluster
        size=10, sizemode='area')
    )
)

f:id:maplerme:20191211165653p:plain

各地点毎にいい感じに色付けしました。

Google Place API で名前つけ

クラスタリングできので、未知の地点は Google Place API を利用して、まとめて取ります。

Google MAP の Python SDK を使います。Mapbox と同じ、Google Cloud Platform の開発アカウントを登録した上、 API KEY を取得する必要があります。

詳細は Google Cloud の公式ドキュメント を参照してください。

※ Place API も同じ従量課金制(Pay-As-You-Go Pricingで、毎月 100,000 回 request まで無料です。詳細はこちら

import googlemaps

gmaps = googlemaps.Client(key='your google api key')

def get_nearby_restaurants(lat, lon, count=5):
  """
  return first `count` of name of place near lat/lon
  """
  ret = gmaps.places_nearby(
    (lat, lon),
    radius=30,
    language='ja', 
    type='restaurant',  # search only restaurant 
    rank_by='prominence'  # sorted by google rank
  )
  return [r['name'] for r in ret['results'] if r['types'][0] == 'restaurant'][:count]

places_nearby method を使って、30メートル範囲(GPS誤差を考える)内のレストランを検索して、結果の中の name をリスト化して返す。

上の未知の地点を試してみる

>> get_nearby_restaurants(35.700658, 139.741140)
['Enoteca Vita', 'アズーリ 神楽坂', '竹子', 'てんたけ', '吾']

Enoteca Vita に行ったことがないが、確かにアズーリ はめっちゃ行ってました。

GPS の誤差があるのでしょうがないことです。

Cluster の DataFrame に応用
grouped_df['predict_locations'] = grouped_df.apply(lambda row: get_nearby_restaurants(row.lat_mean, row.lon_mean), axis=1)
店名をまとめる

もともとアプリであった、place_names と Google Place API で予測した predict_locations をまとめて name という項目にまとめます。

grouped_df['name'] = grouped_df.apply(
  lambda row: '/'.join(row['place_names'][:2]) if row['place_names'] else '/'.join(row['predict_locations'][:2]), axis=1
)

おれおれの飯田橋ランチマップ

データを全部揃ってきたので、いよいよ自分のランチはどんなっているのかを掲示していきます。

まずひとつの DataFrame にまとめます。

my_lunch_map_df = grouped_df[['name', 'lat_mean', 'lon_mean', 'duration', 'times', 'predict_locations']]

可視化したランチマップ最終形態

訪問回数を点の半径 size にして、店名を付けるようにしました

fig = go.Figure(go.Scattermapbox(
    mode = "markers+text",
    lat=my_lunch_map_df["lat_mean"], 
    lon=my_lunch_map_df["lon_mean"], 
    text=my_lunch_map_df['name'],
    textposition = "bottom center",
    hoverinfo='name+text',
    hovertext=my_lunch_map_df.apply(lambda row: f"{row['name']} ({row['duration']}分)" if row['times'] > 1 else '', axis=1),
    marker=go.scattermapbox.Marker(
        color=my_lunch_map_df.index, 
        size=my_lunch_map_df['times'],
        sizeref=0.2,
        sizemode='area',
        colorscale='RdBu',
        ),
    )
)

f:id:maplerme:20191211165728p:plain

※ (こちらはスクリーンショットだけですが、実際 Notebook 上は Mapbox で拡大で、かぶるところの店名も表示されます)

1. 回数をソートしてわかること - 偏食?

# sort by times
my_lunch_map_df.sort_values('times', ascending=False).head(10)
name lat_mean lon_mean duration times predict_locations
0 刀削麺 火鍋 XI'AN 35.698675 139.745555 26.720000 50 [芊品香 別館, うなぎ川勢, X'IAN飯田橋, 居酒屋・秋刀魚, 鳴門鯛焼本舗 飯田橋駅前店]
1 ヤミツキカリー 35.699180 139.744961 17.793103 29 [魂心家 飯田橋, 東京餃子 あかり 飯田橋, 鳥貴族 飯田橋西口店, Yamitukiカリ...
2 JX通信社 35.700228 139.747905 30.555556 27 [大阪王将 飯田橋店, 蕎庵 卯のや, BAR de Cava]
3 たい料理 ロッディー 35.700607 139.745326 21.703704 27 [太郎坊, ロッディー]
4 Enoteca Vita/アズーリ 神楽坂 35.700658 139.741140 39.600000 25 [Enoteca Vita, アズーリ 神楽坂, 竹子, てんたけ, 吾]
5 イタリアン酒場P/エニタイムズ 35.699011 139.748671 27.208333 24 [イタリア料理 LUCE ルーチェ, イタリア酒場P, やきとり あそび邸 飯田橋, 美食処...
6 天こう餃子房 35.701035 139.746305 25.136364 22 [築地食堂源ちゃん 飯田橋店, テング酒場 飯田橋東口店, 土間土間 飯田橋東口店, 牛角 ...
7 鳥酎 飯田橋 35.700951 139.746536 26.000000 21 [テング酒場 飯田橋東口店, 土間土間 飯田橋東口店, ル・ジャングレ, 牛角 飯田橋東口店...
8 牧の家/旬味福でん 35.699635 139.746270 19.250000 20 [牧の家, 旬味福でん, 島]
9 アジアンダイニング puja 35.699137 139.748516 25.111111 18 [イタリア料理 LUCE ルーチェ, 時代寿司, イタリア酒場P, やきとり あそび邸 飯田...
  1. 刀削麺めっちゃ食べてる?!

    • predict_locations を見ればわかるが、この地点では「芊品香 別館」が混在しているので、両方を合わせて 50 回ぐらい行った感じです。どっちも本場中華で、毎週絶対一回行きます。

    • XI`AN の麻辣刀削麺芊品香の地獄辛麻婆豆腐どっちが強いのかな :thinking:

    • ちなみに会社の Slack に 芊品香 のスタンプもあります
  2. 2位であるタイ風カレーの ヤミツキカリー もある時期毎日行ってた記憶があります。

  3. JX通信社オフィスが3位!ある時期自炊して弁当を作ることがあったので、会社で食べることがわりと多かった(間違えってまとめてた可能性もあります)

  4. 8 位 predict_locations にあるという沖縄料理にあたると思います

    偏食諸説?

    総合でみると、中華 (刀削麺 + 餃子房) 72 回、タイ + インド 74 回というのが年中半分以上の昼飯はこの2種類になってますので、偏食している自分を見つかったかもしれない?

2. 滞在時間をソートしわかること - シャッフルランチ制度

滞在時間長いが、ランチではない場所も出てきたので、一回レストラン名が取れなかったものをフィルタリングします

# sort by duration
my_lunch_map_df[my_lunch_map_df['name'].apply(lambda x: bool(x))].sort_values('duration', ascending=False).head(10)
name lat_mean lon_mean duration times predict_locations
46 別亭 鳥茶屋/メゾン・ド・ラ・ブルゴーニュ 35.700753 139.74074 73.000000 1 [別亭 鳥茶屋, メゾン・ド・ラ・ブルゴーニュ, アズーリ 神楽坂, 個室割烹 神楽坂 梅助...
65 神楽坂イタリアン400 クワトロチェント/天空のイタリアン Casa Valeria(カーサ... 35.701537 139.74016 48.000000 1 [神楽坂イタリアン400 クワトロチェント, 天空のイタリアン Casa Valeria(カ...
44 九頭龍蕎麦 はなれ/リストランテ クロディーノ 神楽坂 35.701844 139.73892 48.000000 1 [九頭龍蕎麦 はなれ, リストランテ クロディーノ 神楽坂, 上海ピーマン, 神楽坂 新泉,...
53 浅野屋/Sガスト 神田神保町店 35.695391 139.75954 45.000000 1 [浅野屋, Sガスト 神田神保町店, 本格四川料理 刀削麺 川府 神保町店, 築地食堂源ちゃ...
21 神楽坂魚金/神楽坂 芝蘭 35.702051 139.74132 40.666667 3 [神楽坂魚金, 神楽坂 芝蘭, 天孝, 翔山亭 黒毛和牛贅沢重専門店 神楽坂本店]
4 Enoteca Vita/アズーリ 神楽坂 35.700658 139.74114 39.600000 25 [Enoteca Vita, アズーリ 神楽坂, 竹子, てんたけ, 吾, ますだや, 다케...
32 卸)神保町食肉センター 本店/神保町 kururi 35.697251 139.75759 39.500000 2 [卸)神保町食肉センター 本店, 神保町 kururi]
67 筋肉食堂 水道橋店/札幌らーめん 品川甚作本店 35.701428 139.75370 39.000000 1 [筋肉食堂 水道橋店, 札幌らーめん 品川甚作本店, 立ち呑み海鮮 魚升, 御麺屋 水道橋店...
11 芊品香/和彩 かくや 35.699369 139.75014 38.500000 14 [芊品香, 和彩 かくや]
45 BISTROマルニ/源来酒家 35.695853 139.75458 37.000000 1 [BISTROマルニ, 源来酒家, CHICKEN CREW チキンクルー 神保町, 十勝ハ...

上位に神楽坂の料亭が多く並んているようです。

ここは JX通信社のシャッフルランチという制度があります。1人あたり1,000円を会社から支給で、オフィス周辺の美味しい店でゆっくりランチを食べれます。

www.wantedly.com

神楽坂4年住まいの自分もこの機会しか神楽坂の高級レストランに行けなかった。

ごちそうさまでした!

終わりに

今回は位置情報データをいろいろ変換と可視化をやってみました。 2年ほど寝かしたデータですが、可視化してみると意外と面白かった。 本当に偏食ではないですが、頭の中に思ってたアイディア具体化できるのがいいなと感じました。

明日の「JX通信社Advent Calendar 2019」は, @andmohikoさんです.

参考文献

PySparkはじめました - 分散処理デビューする前にやったこと

JX通信社Advent Calendar 2019」10日目の記事です.

昨日は, @rychhrさんの「Pure WebSocketsをサポートしたAWS AppSyncでWebとiOS間のリアルタイムチャットを作ってみた(1)」でした.

改めまして, こんにちは. JX通信社でシニア・エンジニア&データ基盤エンジニアをしています, @shinyorke(しんよーく)と申します.

JX通信社では, データ駆動での意思決定および施策実施をより円滑に進めるため, データ基盤の構築・運用を進めながらトライアル的に様々なFrameworkやツールの検証を行っています.*1

このエントリーでは,

私がシュッとPySparkで分散処理をする...前に, 手元で試したときの感想とその知見

のお話を残していきたいと思います.

なお, 分散処理そのものの知見・ノウハウではなく, する前にPySparkに慣れておこう!っていう話です.

※分散処理に関する記述はSparkの説明で少しだけやります :bow:

TL;DR

  • PandasとSQLを使えればPySparkは使えそう&書いてて良い感じがする.
  • 環境構築と動くまでが鬼門なので, 自前ホスティングはやめた方が良い, ベスプラは「Cloud系サービス使う」こと(AWS Glue, GCP Cloud Dataprocなど).
  • 利用シーンを明確にした上で使ったほうが幸せ. Pandas他で済むパターンも有る.

この記事を読み終える頃にはきっとPySparkでシュッと何かを試したくなるハズです.

スタメン

やりたかったこと・やったこと

絵に書くとこういう感じです.

f:id:shinyorke:20191203204740p:plain
実践に向けてのキャッチアップでした

そもそも最初の構想としては,

  • プロダクトが出力するログを, 収集 -> クレンジング&クラスタリング -> BigQueryとかのDWHで使えるように出力するETLを作りたい
  • 収集からクラスタリングまでAWS Glue使えるのでは?
  • AWS Glue使うんだったらSpark理解しよう

という所からはじまりました(左側の絵).

AWS Glueのチュートリアルをやりつつも,

ワイ「そもそもGlue・Sparkわからん!」

となったので,

  • ひとまず手元の環境(Macbook Pro)にSparkクラスタをシュッと立てて
  • 手慣れたデータセットを使ってSparkになれてみよう
  • この構成ならSparkとPythonあればイケるのでは!?

と思いつき、エイヤッとやってみました(ってのが右側の絵です).

PySpark #とは

そもそもSparkって何?という方もいると思うので雑に説明すると,

  • 一言でいうと, 大量のデータを分散処理するためのFramework
  • 内部的にはクラスタリングされた「RDD(Resilient Distributed Dataset)」というデータセットで分散処理を実現
  • プログラマブルにデータを触ったりイジイジするためのDataFrameやSpark SQLといったインターフェースを使う*2
  • ETLな処理を実現するためのWorkflow, 機械学習を実装するためのライブラリ(MLlib)が実装されている*3

もので, 今どきの分散処理では割と有力な選択肢となっています.

Spark本体はScala(Java)で開発されており, これらの言語から使うこともできますが,

Pythonベースのラッパーである, PySparkを使って扱うのがベスト・プラクティスとなっています*4*5.

手元で試しに使ってみる

というわけで早速お試ししました.

実際は業務上のデータでお試ししたのですが, 流石にブログでお披露目できるようなものではない(察し)ので,

  • オープンデータで皆が手に入れることができて
  • ある程度きれいなフォーマットのデータで
  • すでにPythonや他の言語で分析・解析した実績がある

ようなデータセットとお題でサンプルをご用意いたしました.

そう, 個人的に一番得意な「野球データ」です.*6

shinyorke.hatenablog.com

上記のエントリー(私の個人ワークです)で作成した,

  • とある歴史上の選手の成長モデルを線形回帰でいい感じにやってみる*7
  • 線形回帰モデルは, 目的変数をOPS(On Base Plus Slugging), 説明変数を年齢になにかしたもの*8
  • 上記をPandas + scikit-learn + bokehで再現*9

を, 今度はPySparkで作り変えました(書き換えまでの所要時間:3時間34分*10).

PySpark版のコードと, Python版コードをぜひ比較しながらこの先お楽しみください.

環境を作る・動かす

検証環境自体は(Sparkの環境作りがどれほど大変かを試す意味も含めて)自分のMacbook Pro上でやりました.

brewでSparkをインストールした後,

 $ brew install apache-spark

pythonのvenvで仮想環境を設定(プロジェクト直下の.venvに作成), direnvで

source .venv/bin/activate
source .env

を設定後, プロジェクト配下の.envに

export PYSPARK_PYTHON=/Users/shinyorke/sample/.venv/bin/python
export PYSPARK_DRIVER_PYTHON=/Users/shinyorke/sample/.venv/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="'notebook' pyspark"

これで準備を行い,

 $ pysprk

コマンドで起動するようになりました.

なお, コードをcloneして自前でビルドしてやる方法もありますが,

  • ある程度JavaとMavenの経験と知識が必要*11
  • 何よりも時間がメッチャかかる*12

事もあり, 積極的にはオススメしません.*13

もしお試ししたい時は,

といった形で課金して人類の叡智に乗っかりましょう.

PySparkとPythonで比較してみた

というわけで実際に比較してみました.

同じようなことをやってるコード同士を抜粋して比較します.

データの読み込み

普通のPython版ではPandasのDataFrameを, PySpark版ではSparkのDataFrameを使いました.

なお, どちらも行列データを扱う・扱うためのメソッドがたくさんついてくる感じですが, PySparkは内部的に分散してデータ*14を保持していること, SQLライクに操れる(後ほど紹介)という違いがあります.

というわけで, CSVからDataFrameを取ってくる処理で比較してみましょう.

Pandas版(抜粋)

お馴染みすぎる処理なのであんまり説明がいらないかも笑.

import csv
import pandas as pd
pd.options.display.max_columns = 100

# CSVからDataFrame
df_batting = pd.read_csv("/Users/shinyorke/github/baseballdatabank/core/Batting.csv")

# nanはすべて0埋め
df_batting.fillna(0.0, inplace=True)

PySpark版(抜粋)

PySparkで同じことをやると

# データ
from pyspark.sql.types import *

# 後ほどDataFrameのデータ型を指定するため使う
STATS_COLUMNS = ['G', 'AB', 'R', 'H', '2B', '3B', 'HR', 'RBI', 'SB', 'CS', 'BB', 'SO', 'IBB', 'HBP', 'SH', 'SF', 'GIDP']

# 打撃成績Schemaのデータ型
schema_batting = StructType(
    [
        StructField('playerID', StringType(), False),
        StructField('yearID', IntegerType(), False),
        StructField('stint', IntegerType(), False),
        StructField('teamID', StringType(), False),
        StructField('lgID', StringType(), False),
        StructField('G', IntegerType(), True),
        StructField('AB', IntegerType(), True),
        StructField('R', IntegerType(), True),
        StructField('H', IntegerType(), True),
        StructField('2B', IntegerType(), True),
        StructField('3B', IntegerType(), True),
        StructField('HR', IntegerType(), True),
        StructField('RBI', IntegerType(), True),
        StructField('SB', IntegerType(), True),
        StructField('CS', IntegerType(), True),
        StructField('BB', IntegerType(), True),
        StructField('SO', IntegerType(), True),
        StructField('IBB', IntegerType(), True),
        StructField('HBP', IntegerType(), True),
        StructField('SH', IntegerType(), True),
        StructField('SF', IntegerType(), True),
        StructField('GIDP', IntegerType(), True),
    ]
)

# RDDをCSVから作る(引数は並列数っぽい)
rdd_batting = sc.textFile('../../../baseballdatabank/core/Batting.csv', 4)


# RDDをDataFrameに変換
batting = spark.read.option("header","true").format("csv").schema(schema_batting).csv(rdd_batting)
batting = batting.fillna(0, subset=STATS_COLUMNS)

Schema作るあたりからJavaな風味が漂ってきますね(小並感).

SparkはRDDがすべての基本なので, RDDを作った後にDataFrameを作ることになります.

Pandasの場合と異なり, 暗黙的に型を指定する動きはしないため,

  • RDD -> DataFrame変換時にフォーマット(CSV)を指定
  • RDDはスキーマを持たないデータなので, DataFrameのSchemaを別に指定

という順序を踏むことになります.

加工(前処理としてOPSを計算)

次に, 取得したデータからOPSを算出します.

OPSは「出塁率(OBP)に長打率(SLG)を足したもの」なので,

OBP = (H + BB + HBP) / (AB + BB + HBP + SF)*15

SLG = (HR * 4 + 3B * 3 + 2B * 2 + Single) / AB*16

OPS = OBP + SLG

という順でいい感じに計算します.

Pandas版(抜粋)

AB, Hなど複数のSeriesを使うので,

  • 計算用の関数を作る. 計算そのものは野球統計ライブラリ「sabr」を使う.*17
  • DataFrameに直接関数をapply

というアプローチでやりました.

# 打率, 出塁率, 長打率. 指標値計算にSABRを使う

import sabr

from sabr.stats import Stats

def obp(row: pd.Series) -> float:
    try:
        return Stats.obp(ab=row.AB, h=row.H, bb=row.BB, sf=row.SF, hbp=row.HBP)
    except ZeroDivisionError as e:
        return 0.0

def slg(row: pd.Series) -> float:
    try:
        return Stats.slg(ab=row.AB, tb=row.TB)
    except ZeroDivisionError as e:
        return 0.0

df_batting['OBP'] = df_batting.apply(obp, axis=1)
df_batting['SLG'] = df_batting.apply(slg, axis=1)
df_batting['OPS'] = df_batting['OBP'] + df_batting['SLG']

この辺はPandas使いの方はお馴染みの方法かな...*18

PySpark版

PySparkの場合はいくつかやり方があります.

  • Python(Pandas)式と同じく, DataFrame上から直接計算
  • Spark SQLで直接計算. 具体的にはselect文の中で直接四則演算する

ひとまずDataFrame式でやるとこうなります.

# udfでユーザー定義関数を定義し, カラムとしてはめていく
from pyspark.sql.functions import udf, col

# sabrは野球指標を計算するためのライブラリ
from sabr.stats import Stats

@udf(DoubleType())
def obp(ab, h, bb, sf, hbp):
    # 出塁率
    try:
        return Stats.obp(ab=ab, h=h, bb=bb, sf=sf, hbp=hbp)
    except ZeroDivisionError as e:
        return 0.0

@udf(DoubleType())
def slg(ab, h, _2b, _3b, hr):
    # 長打率
    try:
        _1b = h - (_2b + _3b + hr)
        tb = _1b * 1 + _2b * 2 + _3b * 3 + hr * 4
        return Stats.slg(ab=ab, tb=tb)
    except ZeroDivisionError as e:
        return 0.0

# DataFrameに打率, 出塁率, 長打率, OPS(出塁率 + 長打率)を追加
batting = batting.withColumn('OBP', obp('AB', 'H', 'BB', 'SF', 'HBP'))
batting = batting.withColumn('SLG', slg('AB', 'H', '2B', '3B', 'HR'))
batting = batting.withColumn('OPS', col('OBP') + col('SLG'))

やってる順序はPandas版と変わりませんが,

  • @udfデコレータで戻り値の型を指定(DoubleType()ってやつですね)
  • DataFrameのwithColumnメソッドでユーザー定義関数と使うカラムを指定

と, これも雰囲気にJava感出てきました.

なお, 参考までにSpark SQLでやる場合は,

# 出塁率と長打率, OPSを自前で計算
query = '''
    select 
    (b.H + b.BB + b.HBP) / (b.AB + b.BB + b.HBP + b.sf) as obp,
    (b.HR * 4 + b.3B * 3 + b.2B * 2 + (b.H - b.HR - b.3B - b.2B) * 1) / b.AB as slg,
    obp + slg as ops
    from batting as b join player as p on b.playerID = p.playerID 
    where p.nameFirst="{firstname}" and p.nameLast="{lastname}"
    order by b.yearID asc
'''
spark.sql(query.format(firstname='Mickey', lastname='Mantle'))

という感じに, SQLっぽくやることはできますが, プログラマブルっぽくないのであんまりオススメじゃないです.*19

学習・可視化

本当はSparkのMLlibを使ってやるつもりでした...が!

途中で挫折したため, 今回はどちらもscikit-learnでやりました :bow:

なお, SparkのDataFrameからPandasのDataFrameは一行で取得できます.

# 年齢, OPSおよび予測に必要なデータを返す関数
def batting_stats(firstname, lastname):
    query = '''
    select 
    b.yearID - p.birthYear - 1 as AGE,
    b.OPS as OPS,
    (b.yearID - p.birthYear - 1) -30 as age_30,
    pow((b.yearID - p.birthYear - 1) -30, 2) as age_30_2
    from batting as b join player as p on b.playerID = p.playerID 
    where p.nameFirst="{firstname}" and p.nameLast="{lastname}"
    order by b.yearID asc
    '''
    return spark.sql(query.format(firstname=firstname, lastname=lastname))

mantle = batting_stats("Mickey", "Mantle")

# scikit-learnで線形回帰するため、pandas Dataframeに変換
df_mantle = mantle.toPandas()

というわけで, こちらはPython版と変わらずですが, 次の機会にMLlibでやろうと思います.

なお, 可視化は結局Pandas DataFrameもしくはndarrayでやるっぽいので, Bokehなど好みのライブラリを使うと良さそうです.

【参考】Python版の可視化

というわけで, 完成したグラフはこちらです.

f:id:shinyorke:20191205225913p:plain
データ元はSparkです(他は変わらない)

結び

今回はデータを前処理するところまで, PySparkとPandasで比較を行いました.

それぞれに良いところ・しっくり行かないところがありますが, あえて言うなら,

  • 数GB以上のデータをそれなりの性能要件(例えばリアルタイムとストリームっぽくなにかやる)で処理する必要がある場合はSpark合ってるっぽい.
  • 通常の分析・解析業務, 特にアドホックに済むものはやっぱPandas強い.
  • とはいえ, PySparkはSparkの仕組みと分散処理のノウハウを把握してたらこれはこれで強い道具.

といったところかなと思います&野球はやっぱお試しのデータセットとして面白い*20.

次の機会がありましたら, タイムオーバーで試すことができなかったMLlibを続きでやりたいなと思います.

ここまでお読みいただきありがとうございました.

明日の「JX通信社Advent Calendar 2019」は, @maplerさんです.

【Appendix】参考資料

当エントリーの執筆および, Sparkのお試しでは以下の文献を参考にしました.

Pythonで大量データ処理!PySparkを用いたデータ処理と分析のきほん*21

入門PySpark

www.oreilly.co.jp

【Appendix】PySparkのサンプルコード(全体)

PySpark検証サンプル

*1:この件のみならず, スプリントの中で時間を区切って検証をしやすい環境ではありますし, これで技術選定・開発も進むので実にやりやすいです.

*2:ここでPandas風にDataFrameだったり, SQLで操れたりします. また, 元々がHadoopの後継的な立ち位置なのでRDDを使ってMap/Reduceという方法でも同じことができます.

*3:後で触れますが今回は間に合わずだったので触れていません. いつかやりたい.

*4:ただし, Sparkの機能をすべてPySparkから使えるわけではないです. 必要十分すぎる感はありますが.

*5:他にもRの実装もあるみたいです, 公式で.

*6:メジャーリーグだと複数のオープンデータセットがあり, Kaggleにも題材がある程度に揃っているかつ, データサイズが数百MB(GBいかない)ぐらいなのでこの手の検証には最適です, 本筋と関係ないですが笑.

*7:要するに時系列で分析しています, 内容知りたい方は引用したブログの方をご覧ください.

*8:OPSは簡単に言うと選手の得点能力を「量」として捉える数字で野球の世界では結構メジャーです, 成長モデルは雑に言うと「若いうちは伸びるけどある一定の年を取ったら衰えるはず」なので, 年齢が説明変数となります.

*9:ちなみに元のコードはRでした, RからPython乗り換えはここでは触れないので引用先をご覧ください.

*10:なんでや!は(ry ...はさておき, 実際のところ1日ちょいで置き換えできました.

*11:Javaで開発したことある人なら多分楽勝です, 自分もそれがあったのでイケましたが「Pythonしかやったことない」レベルだと苦痛だと思います.

*12:スペックによりますが, ビルドで2,3時間近くかかるはず.

*13:過去に個人の興味・趣味でやった際は, こちらのオライリー本の付録を元にやった&これで十分動きましたがまあ...ねえ.

*14:Sparkが分散処理Frameworkなので当然っちゃ当然かなと. 余談ですがPandasの分散処理はDaskほかいくつか存在します.

*15:H:安打数, BB:四球, HBP:死球, AB:打数, SF:犠牲フライ

*16:HR:本塁打, 3B:三塁打, 2B:二塁打, Single:単打, AB:打数. 本塁打から単打までを「塁打」として計算して打数で割ることにより算出します.

*17:ちなみに拙作です.

*18:今回ぐらいのデータ量ならapplyでやるのがベストですが, もっと大きいデータを扱う時はSeries間の変換コストがかなりかかるので, いい感じにmapで処理するなどした方が良いときもあります.

*19:とはいえ, AWS GlueやDatabricksみたいなサービスでアドホックにやるときはこれで良いかもです.

*20:真面目な話, 初めて触るもの・体験するものに備えて「馴れた」データセットを用意しておくのは色々小回り効くし何より対象物のキャッチアップに集中できるのでかなりオススメです.

*21:余談も余談ですが, 林田さんに勧められてSparkとPySparkを認知し, 使いはじめました(この発表された当時同僚でした)

管理画面向けのVue.jsのUIフレームワーク、iViewについて

「JX通信社Advent Calendar 2019」7 日目の記事です。昨日は、鈴木(泰)さんの「CodePipelineを用いたLambdaのデプロイについての所感」でした。

フロントエンドエンジニアの渡辺です。今回は社内の管理画面のUIの話です。

はじめに

管理画面、特に社内向けの画面を作る際は、プロダクト側とは少し事情が異なり、

  • それぞれのコンポーネントのデザインにあまりこだわらない
  • 機能面や、使い勝手の優先度が高い

などの事情があります。なので、UIとしては、「このフレームワークに乗っかっておけばコンポーネントが揃っている」という状態が理想です。

JX通信社の管理画面はVue.jsを使っているケースが多く、その際のUIフレームワークにはiViewを使っています。

特徴

デザインはこのようになっていて、このまま使用しても遜色のない画面を作ることができます。

f:id:jx_k_watanabe:20191207182947p:plain

良かった点

  • 上記の理由で、大抵の管理画面のニーズには応えることができます。特にフォームのinput周り、テーブル周りはスタイル、機能ともにカスタマイズしたい用件が多くあるので、その部分に応えられたのは大きかったです。
  • アップデートが早く、開発中にドキュメントを見るとコンポーネントが追加されている、ということもあります。

難しかった点

  • 国内での知名度がやや低め(英語、中国語の情報がほとんど)だったので、時には中国語のIssueを読みながら対応する、などの場合がありました(中国語は、翻訳機能を使って英語に直してから読むと、結構読めるようになります)

導入

Nuxt.jsから導入する方法について書いていきます。

$ npm install nuxt view-design 

パッケージ名がview-designになっている点が注意が必要です。

インストールが終わったら、iViewを読み込んでいきます。 nuxt.config.jsファイルを新規作成して、以下のコードを追記します。ドキュメントにはもう少し長いconfigが載っていますが、基本的にはこの記述さえあれば大丈夫です。

nuxt.config.js

modules.exports = {
  plugins: [
      {src: '~plugins/iview', ssr: true}
  ]
}

上記のコードはplugins/iview.jsにあるコードを読み込みますよ、という意味なので、それに沿うようにVueから読み込みができるように記述します。ついでに、日本語対応のためのモジュールを追記します。

plugins/iview.js

import Vue from 'vue';
import iView from 'view-design';
import locale from 'view-design/dist/locale/ja-JP'
import 'view-design/dist/styles/iview.css';

Vue.use(iView, { locale });

確認のために、pages/index.vueを作成して、画面からiViewのコンポーネントが表示できるかを確認します。

pages/index.vue

<template>
  <div style="margin: 100px;">
    <div>
      Button: <Button type="primary">Button</Button>
    </div>
    <div style="margin-top: 20px;">
      Datepicker: <DatePicker />
    </div>
  </div>
</template>

f:id:jx_k_watanabe:20191207182745p:plain

画像のように表示されていれば成功です。

さいごに

UIフレームワークをいくつか比較してみると、確実に揃っているべきコンポーネントがある、というのが分かったり、同じコンポーネントでもデザインが大きく違うものがあったりします。

フロントエンド開発でも最も依存が大きくなるライブラリの一つなので、選定の際には他のライブラリも見てみたり、用件が決まっていれば、「このコンポーネントを組み合わせて使えば実装できそう」などのイメージを立てておくのも大事です。

良質なフレームワークがたくさんあるので、是非、自分たちに合ったライブラリを探してみてください。

明日は、rychhrさんの、「Pure WebSocketsをサポートしたAWS AppSyncでWebとiOS間のリアルタイムチャットを作ってみた」です。

CodePipelineを用いたLambdaのデプロイについての所感

JX通信社Advent Calendar 2019」7 日目の記事です。 こんにちは。2019年9月からJX通信社のエンジニアとなった鈴木(泰)です。趣味は映画観賞です。

はじめに

JX通信社では AWS の Lambda Layer、Lambda 関数を使った Serverless なアプリケーションの開発に従事しています。

私が初めて Lambda 関数に触れたのは2019年の9月です。 3ヶ月のあいだ業務で扱ってきたこともあり、現在では Lambda 関数をサクサク作れるようになりました。 また、複数の Lambda 関数を連携させて1つのアプリケーションを組んでみたり、共通する処理を Layer として切り出したりと、少しずつ複雑なこともできるようになりました。

最近の問題は、増えてきた Lambda 関数の管理です。 特に、Lambda 関数のデプロイにかかる手間の大きさが問題(詳細は後述します)でした。

本記事では、AWS CodePipeline を使用してアプリケーションのデプロイを自動化したという話、CodePipelineを使用しての所感を、忙しい人向けに(あまり長くならないように)紹介したいと思います。

TL;DR

  • Lambda 関数のデプロイを手作業でやるのがダルい・・・
  • CodePipeline を使って、デプロイを自動化しました!
  • せっかくなので、CodePipeline 使ってみた感想をシェアさせてください。

CodePipelineとは何か?(ざっくりと)

本題に入る前に、少しだけCodePipelineに触れておきます。

CodePipelineとは、CDを構築するための仕組みです。 こちらの図にある通り、Sourceからプロダクション環境へのデプロイまでのパイプラインを構築できます。

より厳密には、CodePipelineはStageを繋げてパイプラインを構築するための仕組みです。 Stageとは1つ以上のActionを含む、Actionのまとまりです。 Actionが具体的な処理の最小単位です。 StageのなかのActionの実行順番は、直列にも並列にもできます(詳しくはこちら)。

CodePipeline を用いてデプロイを自動化しました

私が所属するチームでは、複数の Lambda 関数と 1 つの Layer を用いた社内アプリケーションを運用しています。 Layer は、Lambda 関数の中で共通して使われる処理を含みます。

デプロイ作業の手間が大きかった

これまでは、このアプリケーションを手作業でデプロイしていました。 次のようなデプロイ手順(開発者が変更を push した後)です。

  1. Git 上のソースコードを自分の MacBook Pro へ clone し、Lambda 関数デプロイ用の zip アーカイヴに固めます。
  2. 手順 1 にて作成したアーカイヴを AWS 上へアップロードします(この作業を AWS の UI から行なっていました)。
  3. Layer にアップデートがあった場合、各 Lambda 関数が使用している Layer のバージョンを更新します(この作業を AWS の UI から行なっていました)。

上の作業にかかる時間が大体10分ぐらいでしょうか。 確かに、1度だけであれば手間はかからないです。

しかしながら、 この社内アプリケーションは未だ発展途上の段階にあるため、修正が高頻度で発生します。 そのため、毎日デプロイ作業が発生し、作業頻度が増えれば増えるほど作業ミスも多くなり、とても大きな時間が削り取られていました笑。

構築したデプロイパイプライン

私たちが目指したのは

  • ソースコードを Git へプッシュするだけ。デプロイまでは自動で完了する。

という世界観でした。 この社内アプリケーションの要件と構造自体はシンプルなものであったため、上の世界観を目標とすることは十分に現実的でした。

次のようなデプロイのパイプラインができました。

図1 f:id:taisuzuk:20191205103812j:plain

パイプライン構築後のデプロイ手順(開発者が変更を push した後)

なんとこれだけです。

  1. 「Lambda 関数を Deploy して良いですか?」というリクエストメールを管理者が受け取り、「リクエストを承認」をクリックする(図 1 の Mannual Approval)。

パイプライン(図1)の解説

  • GitLab のマスターブランチ へ push すると、GitLab から CodeCommit へレポジトリがミラーリングされます。弊社では、ソースコードレポジトリとして GitLab エンタープライズを使用しているため、CodePipeline からソースコードを直接取得できません。そのため、CodeCommit へミラーリングしています。
  • 1 つ目の CodeBuild と CloudFormation は、Layer をビルド・デプロイするためのものです。CodeBuild は、Layer が格納されている zip と SAM パッケージを生成し、成果物として S3 へ保存します。CloudFormation は S3 上に保存されている成果物を Serverless 実行環境へ反映します。
  • 2 つ目の CodeBuild と CloudFormation は、Lambda 関数をビルド・デプロイするためのものです。CodeBuild は、Lambda 関数が格納されている zip と SAM パッケージを生成し、成果物として S3 へ保存します。CloudFormation は S3 上に保存されている成果物を Serverless 実行環境へ反映します。
  • Mannual Approval は、Lambda 関数のデプロイを開始するかどうか?を、ソースコード管理者へ確認するためのものです(なぜこの確認が必要となるのか?については長くなるので省きます笑)。確認には Slack を用いています。具体的には、次のようなメッセージが Slack へ通知されます。f:id:taisuzuk:20191205113451p:plain
  • Mannual Approval は、CodePipeline の Mannual Approval Action を用いて作りました。この機能は(ざっくりと説明)、承認・拒否のどちらかのボタンをクリックするまで、パイプラインを一時的に停止するためのものです。

所感

「イケてるところ」と「工夫を要する点(a.k.a 微妙な点)」についてまとめてみました。

イケてるところ

認証・認可周りの設定管理が楽

(当たり前ですが)デプロイするまでのパイプラインがAWS上で全て完結します。 サードパーティ製のサービスを選択しなくて良いことによる恩恵は、ユーザーの権限管理、サービスの学習や事前調査(AWSの実行環境との親和性を調べたり)をしなくて良いことです。

自由度が高い

(これも当たり前かもしれませんが)CodePipeline は幅広くカスタマイズ可能です。 サードパーティが提供する CI/CD サービスでできることであれば、大体のことはサポートされているように思います。 パイプラインの中に CodeBuild(CircleCIのようなビルドパイプライン)を挿れることにより、殆どのことは実現可能かと思います。

自由度への制約

1つ上で「自由度が高い」を称賛しましたが、CodePipeline は上手い具合に自由度を制限します。 より正確に言えば、ある特定の目的を達成するために、AWS は唯一の方法を提供してくれます。

論理的に考察した場合、CodePipeline を選択する決め手となりうるのは、この標準化にあるのではないか?と思います。

これがどういうことかを説明するために、少しだけパイプラインの仕組みについて述べます。

CodePipeline とは Stage を繋げたものであり、Stage は1つ以上の Action 繋げたものです。 Action が具体的な処理の最小単位となります。

f:id:taisuzuk:20191202162921j:plain

Action に設定できる処理は、AWS により予め決められています。 要するに、AWS は「XXの目的のときはYYを使うと良い」ということを提案してくれているのです。 例えば、Lambda 環境へのデプロイを実現する方法として、CodeBuild(S3 に SAM をアップロードし、この SAM を適用するコマンドを buildspec.yml に羅列し、ゴニョゴニョゴニョ・・・)を用いることで実現可能です。 しかし、CodePipeline では、Lambda 環境へのデプロイをするためには、CloudFormation を使用した方が良いということを明言します。

工夫を要する点(a.k.a 微妙な点)

ドキュメント

これは私だけかもしれないのですが、率直に言ってドキュメントが読み難い・・・というよりもどこから読んで理解したら良いのか分からない・・・というのがありました。

この理由は、CodePipeline に関わるサービスが幅広いからなのかな、と思っています。 私が参考にした CodePipeline 構築チュートリアルは、CodeCommit、CodeBuild、CodeDeploy、CloudFormation、の3つサービスを組み合わせてパイプラインを作っていました。 私は3つの全てについて無知だったため、最初何が何やら訳の分からない状態で、とりあえず動くものを作っていました笑。

私の場合、まずこのチュートリアルを見ながらとりあえず動くものを作り、その後でドキュメントを熟読しました。 時間がない場合、この方法でも問題ないと思います。 言うまでもなく、CodePipeline を習得するための良い方法は、CodePipeline で使われている各々のサービスを1つずつ理解していくことだと思います。

パイプラインの実行結果のSlack通知を自前で作らなければならない

実行結果を Slack へ通知したい場合、SNS を通して Lambda 関数から Slack へ通知しなければなりません。 つまり、イベントを受けて Slack へポストするための Lambda 関数を自前で作成しなければならないのです。

昨今のサードパーティ製のサービスであれば、Slack 連携は容易にできることが当たり前です。 今後、AWS に改善していただきたい点の1つだと思います。

私が知らないだけで簡単にできる方法あるのかな???

まとめ

「所感」では微妙な点も述べましたが、これらは「慣れ」によって十分に解消可能だと思っています。 実際のところ、私自身は CodePipeline に触れてから3週間以上経ち、当初微妙な点であると感じていたことが気にならなくなっています。 今ではサクサク CD を作れるようになりました。

これから AWS でアプリケーションを構築する方は、選択肢の1つとして検討してみては如何でしょうか? それではありがとうございました。

Appendix