Python Clickユニットテスト・レシピ集 - CLIではじめるテスト駆動開発(その1)

JX通信社Advent Calendar 2019」20日目の記事です。 こんにちは。2019年9月からJX通信社のエンジニアとなった鈴木(泰)です。好きな食べ物はオムライスです。

本日は、Python Clickユニットテスト・レシピ集 - CLIではじめるテスト駆動開発(その1)と題して、CLIのユニットテストのスニペットを書いてみたいと思います! "その1"とした理由は、アドカレに間に合わな(小声)・・・じゃなかった・・・この記事だけで全ての備忘録を列挙すると長くなりすぎてしまい、記事が読み難くなると判断したからです。

今後も引き続き少しづつ備忘録を紹介していければと思います。

はじめに

私はCLIをよく書きます。その理由は、バックエンドシステムの運用業務に携わっていることにあります。運用業務では様々な場面でCLIを作成します。私の場合、運用業務における手作業を自動化するため、バッチ処理を書くため、というのが主です。

にも関わらず、私はCLIのユニットテストを、これまであまり書いてきませんでした。これについてはとても反省しています。ごめんなさい!

書かなかった理由(a.k.a 言い訳)は多々あります。

  • 書き捨てスクリプトだから。
  • 外部コンポネントと密結合しているので、ユニットテストが書き難い。
  • テストを書く時間がない。忙しい。
  • テストを書くことは、オーバーワーク。
  • etc ...

ざっくりとひとまとめにすると、要は「書いている時間がない」のです。

書いている時間がない場合どうするか?私がよく使う手はコピー・ペーストです。つまり、CLIを書くときに頻出するパターンを想定した、コピーペースト用のスニペットを用意しておけば良いのです。今回は、CLIを書くときに頻出するパターン毎に、コピーペースト用のスニペットを書いてみることにしました。

私の知らないもっと良い方法を知っている方がいましたら、コメント等でご教示いただけると幸いです!

目次

対象とするCLI

対象とするのは、外部コンポネントとの結合がないCLIのユニットテストです。

外部コンポネントとの結合とは、ファイルシステムやネットワーク通信を介してシステム繋がりのことです。例えば、WebAPIから何らかのデータを取得し、結果をローカルファイルに保存するCLIは、外部コンポネントとの結合があるCLIです。なぜなら、WebAPIとローカルファイルシステムという、2つのコンポネントとつながっているからです。

click

clickというライブラリを用いてCLIを書きます。clickを用いる理由は

といったことがあります。

諸注意

  • 本記事で掲載するソースコードは、MacOS 10.15.2、Python3.7、こちらのclickおよびその依存パッケージのバージョンで動作確認をしております。
  • 私がよく使うスニペットを前提に書いています。私があまり使わないなぁ・・・と思ったら、そのスニペットは書いていません。(・・・が、こんなスニペットも使える!とか、これは必要だろ!とかあればコメントお願いします!)
  • 本記事で使ったソースコードはこちらにあります。

スニペット一覧

外部コンポネントとの結合がない場合

標準出力、標準エラー出力、終了ステータスコードの検証

まずは、必ず検証すべき3つの値(標準出力、標準エラー出力、終了ステータスコード)からです。

CLIのソースコード 全体

import click

@click.command()
def cli():
    click.echo('こんにちは')
    click.echo('世界!', err=True)  # 標準エラー出力
    exit(100)

テストコード 全体

from click.testing import CliRunner
from cli_stdout import cli

def test_cli_output():
    result = CliRunner().invoke(cli)
    assert result.output == 'こんにちは\n世界!\n'

def test_cli_output_separatly():
    # 標準出力と標準エラー出力を分離して出力をテストしたい場合、
    # `CliRunner(mix_stderr=False)`とする。
    result = CliRunner(mix_stderr=False).invoke(cli)
    assert result.stdout == 'こんにちは\n'
    assert result.stderr == '世界!\n'

def test_cli_exit_code():
    result = CliRunner().invoke(cli)
    assert result.exit_code == 100
  • clickを使用する場合click.echo関数を用いて出力することが慣習ですが、print関数で出力した場合でも、上のテストは動きます。

色付き出力の検証

clickではANSI Color codeを用いて、出力される文字列に色を付与できます。

f:id:taisuzuk:20191219114904p:plain

CLIのソースコード 全体

import click

@click.command()
def cli():
    click.echo('こんにちは')
    click.echo(click.style('JX', fg='green'), nl=False)
    click.echo(click.style('通信社!', fg='red'))

テストコード 全体

from click.testing import CliRunner
from cli_color_output import cli

def test_cli_output():
    result = CliRunner().invoke(cli)
    assert result.output == 'こんにちは\nJX通信社!\n'
  • ユニットテストで検証するときは、ANSI Color codeを無視できます。
  • ANSI Color codeを含めた出力の検証をしたい場合、CliRunner(color=True)とすればできます。

例外の検証

例外を投げるCLIの検証です。

CLIのソースコード 全体

import click

@click.command()
def cli():
    raise Exception('Hello world!')

テストコード 全体

from click.testing import CliRunner
from cli_exception import cli

def test_cli_exception():
    result = CliRunner().invoke(cli)
    assert 'Hello world!' == str(result.exception)
    assert Exception == type(result.exception)
  • 例外がない場合、result.exceptionNoneです。

コマンドライン引数やオプションの検証

clickでは、コマンドライン引数やオプション指定を間違えていた場合、デフォルトでは終了ステータス2(より厳密には、staticな変数click.exceptions.UsageError.exit_codeの値)で終了します。

私としては、CLIの引数の検証(例えば、このオプションが必須で・・・この引数は文字列で・・・といったような仕様の検証)は不要であり、終了ステータスだけ検証すれば良いと思います。なぜなら、CLIの引数周りの処理はclickライブラリが担う責務であり、利用者である私たちが検証すべきことではないからです。

CLIのソースコード 全体

import click

@click.command()
@click.argument('src', nargs=-1)
@click.argument('dst', nargs=1)
def cli(src, dst):
    click.echo(src)
    click.echo(dst)

テストコード 全体

from click.testing import CliRunner
from cli_option import cli

def test_cli_option_usage_exception():
    result = CliRunner().invoke(cli, args=[])
    assert 2 == result.exit_code

サブコマンドのテストの書き方

サブコマンドのテストは次のように書きます。

CLIのソースコード 全体

import click

@click.group()
def cli():
    pass

@cli.command()
def sub1():
    click.echo('sub command1')

@cli.command()
def sub2():
    click.echo('sub command2')

テストコード 全体

from click.testing import CliRunner
from cli_sub import

def test_cli_sub1():
    result = CliRunner().invoke(cli, args=['sub1'])
    assert 'sub command1\n' == result.output

def test_cli_sub2():
    result = CliRunner().invoke(cli, args=['sub2'])
    assert 'sub command2\n' == result.output

環境変数の検証

clickでは、CliRunnerのenvという引数に、テストで使用する環境変数を指定できます。

CLIのソースコード 全体

import click

@click.command()
@click.option('--name', type=str, envvar='NAME')
@click.option('--age', type=int)
def cli(name: str, age: int):
    click.echo('{} {}'.format(name, age))

テストコード 全体

from click.testing import CliRunner
from cli_env import 

def test_cli():
    result = CliRunner(env={'NAME': 'hoge'}).invoke(cli, args=['--age=1'])
    assert 'hoge 1\n' == result.output

標準入力

clickには、標準入力からデータを読むためのget_text_stream関数があるのですが、この関数を使った場合のユニットテストの書き方はわかりませんでした。。。じゃあこの記事に掲載するなよ!と思われるかもしれませんが、もしかしたら誰か良い方法を知っているかもしれないということで・・・記事に掲載することとしました。

CLIのソースコード 全体

import click

@click.command()
def cli():
    body: str = click.get_text_stream('stdin', encoding='utf-8').read()
    click.echo(body)

テストコード 全体

# どう書いたら良いかわかりませんでした・・・。

標準入力2

標準入力を読み込むために、clickのget_text_stream関数ではなくinput関数を使用した場合であれば、次のようにしてユニットテストを書くことができます。

CLIのソースコード 全体

import click

@click.command()
def cli():
    body: str = input()
    click.echo(body)

テストコード 全体

from click.testing import CliRunner
from unittest.mock import patch
from cli_stdin2 import cli

def test_cli():
    with patch('builtins.input', return_value='hoge'):
        result = CliRunner().invoke(cli, args=[])
        assert 'hoge\n' == result.output

所感

今回は、外部コンポネントとの結合がない場合のみなので、あまり迷うことはありませんでした。その理由はclick.testing.CliRunnerが用意されていることにあると思います。これのおかげで「こういう場合はこう書けば良い」という方針が明確になっています。

次回は、外部コンポネントとの結合がある場合のスニペットについて書く予定です。特に、ファイルシステムに読み書きする、HTTP通信する、MySQLに接続する、ロガーを通してアプリケーションログを残す、については頻出パターンなので、スニペットを用意したいと考えています。外部コンポネントとの結合部をどうモックするか?スニペットし易い簡潔なモックをどう書くか?というところが鍵となりそうだと考えています。

それでは次回もご期待ください! ありがとうございました。

DartでCLIツールを作ろう

この記事はJX通信社アドベントカレンダーの19日目です。

sakebookです。最近はServer Side Kotlinをやってますが、Flutterも少し触ってます。

去年はKotlin で CLI のネタを書いたので今年はそれのDart版を書こうと思います。

tech.jxpress.net

全体の流れ

  • Dartとプロジェクトのセットアップ
  • CLIでの動作確認
  • GitHub Actionsで配布

Dartのインストール

Flutterを使っていればbundleでインストールされていますが、standalone版が必要(後述)なのでHomebrewでいれます。

$ brew tap dart-lang/dart
$ brew install dart

筆者の環境は次の通りです

$ dart --version
Dart VM version: 2.7.0 (Fri Dec 6 16:26:51 2019 +0100) on "macos_x64"

CLIツールのテンプレート作成

Stagehandでプロジェクトテンプレートを作成します(関係ないですが画像が好み)。

まずはStagehandを有効化します。

$ pub global activate stagehand

Stagehandにはいくつかのテンプレートが用意されていますが、今回はCLIツールを作りたいので console-full を選択します。

$ mkdir dart-cli-sample
$ cd dart-cli-sample
$ stagehand console-full

作成したテンプレートは次のような構成になっています。pub packageに則した構成になっています。

.
├── CHANGELOG.md
├── README.md
├── analysis_options.yaml
├── bin
│   └── main.dart
├── lib
│   └── dart_cli_sample.dart
├── pubspec.yaml
└── test
    └── dart_cli_sample_test.dart

dartファイルを見ていきます。

  • bin/main.dart
import 'package:dart_cli_sample/dart_cli_sample.dart' as dart_cli_sample;

void main(List<String> arguments) {
  print('Hello world: ${dart_cli_sample.calculate()}!');
}
  • lib/dart_cli_sample.dart
int calculate() {
  return 6 * 7;
}

mainはbinにあり、実装はlibに置くような構成になっています。

テストも作成されます。

  • test/dart_cli_sample_test.dart
import 'package:dart_cli_sample/dart_cli_sample.dart';
import 'package:test/test.dart';

void main() {
  test('calculate', () {
    expect(calculate(), 42);
  });
}

動かしてみる

初回は依存ライブラリのDLが必要です。

$ pub get
$ dart bin/main.dart
Hello world: 42!

この状態だと、Dartコードを実行しただけです。

Dartコードを変換してネイティブコードにします。

ネイティブコードの作成

dart2native コマンドを実行して作成します。このコマンドはFlutterにbundleされているDartには含まれていません。

$ dart2native bin/main.dart -o main
Generated: /YOUR_PATH/dart-cli-sample/main

作成した main を実行してみます。

$ ./main 
Hello world: 42!

無事実行できました。

制約

どこでも実行可能なものができたと思いきや、現状はホストOS用のネイティブコードしかコンパイルされません。なので、macOS, Windows, Linuxとそれぞれでコンパイルしないとダメです。

現状の制約は辛いですが、その辛さを和らげる方法があります。

GitHub Actions

GitHub Actionsでは、マトリクスビルドをサポートしています。これを利用して、OSごとに実行してそれぞれに対応したネイティブコードを作成します。

コードの修正

各OSで動かしていることがわかるように、コードを変更します。

  • bin/main.dart
import 'dart:io';

import 'package:dart_cli_sample/dart_cli_sample.dart' as dart_cli_sample;

void main(List<String> arguments) {
  stdout.writeln('Hello ${dart_cli_sample.system()}!');
  exitCode = 0;
}
  • lib/dart_cli_sample.dart
import 'dart:io';

String system() {
  return Platform.operatingSystem;
}

動作環境のOS名を返すコードです。

Actionを定義

.github/workflows/ にyamlファイルを置きます。

先に完成したものを貼っておきます。

name: Cross compile
on: [push]

jobs:
  build:
    name: Compile
    runs-on: ${{ matrix.os }}
    strategy:
      fail-fast: false
      # https://help.github.com/ja/actions/automating-your-workflow-with-github-actions/virtual-environments-for-github-hosted-runners#supported-runners-and-hardware-resources
      matrix:
        os: [windows-latest, ubuntu-latest, macos-latest]
        include:
          - os: windows-latest
            file-name: windows.exe
          - os: ubuntu-latest
            file-name: ubuntu
          - os: macos-latest
            file-name: macos
    steps:
      - name: Checkout
        uses: actions/checkout@v1
        # https://dart.dev/get-dart
      - name: Install Dart(windows)
        if: matrix.os == 'windows-latest'
        run: |
          choco install dart-sdk
      - name: Install Dart(ubuntu)
        if: matrix.os == 'ubuntu-latest'
        run: |
          sudo apt-get update
          sudo apt-get install apt-transport-https
          sudo sh -c 'wget -qO- https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add -'
          sudo sh -c 'wget -qO- https://storage.googleapis.com/download.dartlang.org/linux/debian/dart_stable.list > /etc/apt/sources.list.d/dart_stable.list'
          sudo apt-get update
          sudo apt-get install dart
      - name: Install Dart(macos)
        if: matrix.os == 'macos-latest'
        run: |
          brew tap dart-lang/dart
          brew install dart
      - name: Build(windows)
        if: matrix.os == 'windows-latest'
        run: |
          $env:ChocolateyInstall = Convert-Path "$((Get-Command choco).Path)\..\.."
          Import-Module "$env:ChocolateyInstall\helpers\chocolateyProfile.psm1"
          refreshenv
          pub get
          dart2native bin/main.dart -o bin/${{ matrix.file-name }}
      - name: Build(ubuntu)
        if: matrix.os == 'ubuntu-latest'
        run: |
          echo 'export PATH="$PATH:/usr/lib/dart/bin"' >> ~/.profile
          source ~/.profile
          pub get
          dart2native bin/main.dart -o bin/${{ matrix.file-name }}
      - name: Build(macos)
        if: matrix.os == 'macos-latest'
        run: |
          pub get
          dart2native bin/main.dart -o bin/${{ matrix.file-name }}
      - name: Upload artifact
        uses: actions/upload-artifact@v1
        with:
          name: bin
          path: bin
  execute:
    name: Run artifact
    needs: build
    runs-on: ${{ matrix.os }}
    strategy:
      fail-fast: false
      matrix:
        os: [windows-latest, ubuntu-latest, macos-latest]
        include:
          - os: windows-latest
            file-name: windows.exe
          - os: ubuntu-latest
            file-name: ubuntu
          - os: macos-latest
            file-name: macos
    steps:
      - name: Download artifact
        uses: actions/download-artifact@v1
        with:
          name: bin
      - name: Run(windows)
        if: matrix.os == 'windows-latest'
        run: |
          cd bin
          .\${{ matrix.file-name }}
      - name: Run(ubuntu)
        if: matrix.os == 'ubuntu-latest'
        run: |
          cd bin
          chmod 755 ${{ matrix.file-name }}
          ./${{ matrix.file-name }}
      - name: Run(macos)
        if: matrix.os == 'macos-latest'
        run: |
          cd bin
          chmod 755 ${{ matrix.file-name }}
          ./${{ matrix.file-name }}

Dartをインストール

ホストOSで実行させたいので、JavaScriptアクションを使います。MarketPlaceにDartのJavaScriptアクションが見当たらなかったので、愚直にDartをインストールしました。

コンパイル

インストールしたDartへのPATHを通してコンパイルします。同一フォルダに出力してartifactとしたかったので、ファイル名をincludeでそれぞれ定義しています。

実行

それぞれのOSで生成したネイティブコードをそれぞれのOSで実行してみた結果です。

  • windows

f:id:sakebook:20191218192514p:plain

  • ubuntu

f:id:sakebook:20191218192541p:plain

  • macos

f:id:sakebook:20191218192602p:plain

しっかりOS名が出力されています。

手元でそれぞれ動かしてみたい方はこちらからartifactをDLできます。

各OSでしか実行できないことがわかると思います。

まとめ

テンプレート作成だったり、今回は触れませんでしたが引数をパースするライブラリも提供されており、DartでCLIツールは作りやすいです。

まだ制約もありますが、今回の様なやり方で制約を緩和することができます。

今回動作確認したリポジトリはこちらです。

github.com

参考

Get the Dart SDK | Dart

GitHub - dart-lang/stagehand: Dart project generator - web apps, console apps, servers, and more.

Write command-line apps | Dart

dart2native | Dart

Workflow syntax for GitHub Actions - GitHub Help

installation - How to refresh the environment of a PowerShell session after a Chocolatey install without needing to open a new session - Stack Overflow

Istio VirtualServiceのHost衝突を検知するAdmission Webhookをつくってみる

JX通信社Advent Calendar 2019」15日目の記事です。昨日はペイさんによるNuxt.js + firebaseで「積ん読防止」アプリを作ってみたでした。

こんにちは、SREのたっち(TatchNicolas)です。

はじめに

引き続き、KubernetesのAdmission Webhooksについて書きます。Admission Webhooksとは何か、簡単に作って動かしてみる方法については前回記事も参照してください。

tech.jxpress.net

今回はもう少し踏み込んで、ちょっとだけ役に立ちそうなAdmission Webhookを書いてみたいと思います。

TL; DR

  • Istio VirtualServiceのHostの衝突を防止するValidating Webhookを作った
  • 似たようなロジックでk8s本体のingressなどにも使えるはず

前回までのあらすじ

Kubernetesのリソース操作は、kube-apiserverにリクエストを送ることで行います。その操作がetcdに反映される前にそのリクエスト内容をValidate/MutateできるのがAdmission controllerで、Kubernetesにbuilt-inしなくても自前のValidate/Mutateの処理を足せるのがAdmission Webhooksです。

前回はWebhook作成に必要なもの(Webhookを動かすDeployment、Serviceリソース、証明書など)を作る手順をゼロから実施し、サンプルとして metadata.labels の規約が守られているかチェックしたり、 metadata.name に接頭辞を自動で付けたりする処理を実装しました。

今回やってみたこと

IstioのVirtualServiceを定義するときに、ドキュメントにもあるように、同じHostを複数のVirtualServiceに分割して書くやり方があります。

しかし、内向けのAPIなどの定義にパスではなくホスト名でマイクロサービスを定義している場合にはhostの衝突は避けたいと思います。

たとえば、 api.some-product というホスト名がすでに存在しているときに別のチームで *.some-product のようなホスト名が登録されてしまうと、複数のVirtualServiceリソース間での評価順序は保証されないため、リクエストが吸い取られて意図しない挙動をしてしまうかもしれません。

そこで、マニフェスト適用時に衝突を検出できるようなValidating Webhookを作ってみました。

サンプルコードは以下になります。

github.com

やってみる

Prerequisites

  • Istioが動いているクラスタ*1
  • 公式ドキュメント*2か前回の記事を参考に、証明書などWebhook開発のための準備ができていること
  • Webhookを動かしているPodにkube-apiserverを叩かせるための権限を与えていること*3

Webhookのリソース定義

webhooks.rules 以下にValidation対象のリソース種別を書いていきます。

apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: "istio-vsvc-host"
webhooks:
- name: "istio-vsvc-host.hoge.fuga.local"
  failurePolicy: Fail
  rules:
  - apiGroups: ["networking.istio.io"]
    operations: ["CREATE","UPDATE"]
    apiVersions: ["v1alpha3"]
    resources: ["virtualservices"]
    scope: "Namespaced"
  clientConfig:
    caBundle: <server.crtの中身をbase64エンコードして貼る>
    service:
      namespace: default
      name: istio-vsvc-host
      path: /validate
  admissionReviewVersions: ["v1", "v1beta1"]
  timeoutSeconds: 5
  sideEffects: None

Webhookの実装

Webhookもいたってシンプルです。

import fnmatch

from flask import Flask, jsonify, request
from kubernetes import client, config


app = Flask(__name__)

config.load_incluster_config()
# telepresenceではこっちを使う
# config.load_kube_config()

conf=client.Configuration()
api_client=client.ApiClient(configuration=conf)

@app.route('/validate', methods=['POST'])
def validate():
    try:
        # リクエストから必要な情報を抜き出す
        req = request.get_json()
        new_hosts = req['request']['object']['spec']['hosts']
        apiserver_resp = api_client.call_api(
            '/apis/networking.istio.io/v1alpha3/namespaces/default/virtualservices',
            'GET',
            auth_settings=['BearerToken'],
            response_type='object',
        )
        nested_existing_hosts = {
            tuple(item['spec']['hosts']) for item in apiserver_resp[0]['items']
        }
        existing_hosts = {item for sublist in nested_existing_hosts for item in sublist}

        print(f'existing_hosts: {existing_hosts}')
        print(f'new_hosts: {new_hosts}')

        # UPDATEのときは、oldに入っているものは検査対象から除外する
        operation = req['request']['operation']
        if operation == 'UPDATE':
            old_hosts = req['request']['oldObject']['spec']['hosts']
            for host in old_hosts:
                existing_hosts.remove(host)
            print(f'updated existing_hosts: {existing_hosts}')

        # hostsの被りがないかチェックする
        pair = get_collision_pair(new_hosts, existing_hosts)

        if pair:
            allowed = False
            message = f'{pair[0]} collides with {pair[1]} which already exists'
        else:
            allowed = True
            message = f'No collision detected'


        # 結果を返す
        return jsonify({
            'apiVersion': 'admission.k8s.io/v1',
            'kind': 'AdmissionReview',
            'response': {
                'uid': request.get_json()['request']['uid'],
                'allowed': allowed,
                'status': {'message': message}
            }
        }), 200

    except (TypeError, KeyError):
        return jsonify({'message': 'Invalid request'}), 400

def get_collision_pair(new_hosts, existing_hosts):
    for new_host in new_hosts:
        for existing_host in existing_hosts:
            if fnmatch.fnmatch(new_host, existing_host) or fnmatch.fnmatch(existing_host, new_host):
                return new_host, existing_host

動かしてみる

以下のようなVirtualServiceが既に存在する状態で、新たにVirtualServiceのhostを増やしてみます。

$ kubectl get virtualservices
NAME              GATEWAYS   HOSTS                     AGE
existing-vsvc-1              [hoge.tatchnicolas.com]   15s
existing-vsvc-2              [fuga.tatchnicolas.com]   15s

まずはぶつからない場合。

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: new-vsvc
spec:
  hosts:
  - 'new.tatchnicolas.com'
  http:
  - name: "ClusterIP"
    route:
    - destination:
        host: new-serivce.default.svc.cluster.local

これはapplyしても普通に成功します。

$ kubectl apply -f istio-vsvc-host/new_vsvc.yaml
virtualservice.networking.istio.io/new-vsvc created

では、既存のhostと衝突させてみましょう。

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: new-vsvc
spec:
  hosts:
  - 'new.tatchnicolas.com'
  - 'fuga.tatchnicolas.com'  # これがぶつかる
  http:
  - name: "ClusterIP"
    route:
    - destination:
        host: new-serivce.default.svc.cluster.local

衝突を検知し、きちんと拒否していることがわかります。

$ kubectl apply -f istio-vsvc-host/new_vsvc.yaml
(中略)
for: "istio-vsvc-host/new_vsvc.yaml": admission webhook "istio-vsvc-host.hoge.fuga.local" denied the request: fuga.tatchnicolas.com collides with fuga.tatchnicolas.com which already exists

ワイルドカードにも対応できます。

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: new-vsvc
spec:
  hosts:
  - '*.tatchnicolas.com'
  http:
  - name: "ClusterIP"
    route:
    - destination:
        host: new-serivce.default.svc.cluster.local

こちらも拒否できていますね。

$ kubectl apply -f istio-vsvc-host/new_vsvc.yaml
(中略)
for: "istio-vsvc-host/new_vsvc.yaml": admission webhook "istio-vsvc-host.hoge.fuga.local" denied the request: *.tatchnicolas.com collides with fuga.tatchnicolas.com which already exists

さいごに

リクエストを適切に処理できれば実装言語は何でも良いので、前回のコードをベースにそのままPythonで書きました。比較のロジックもかなり簡単に書いたのでパフォーマンス面で改善の余地がありそうですし、実用レベルに持っていくにはNamespace対応したりテストもしっかり書かないといけないでしょう。

それでも、KubernetesやIstioに手を加えずに、ちょっとしたチェック/書き換え機能を追加できるのは非常に便利です。

理想的にはもっと汎用的にValidation条件を渡せるようにして、カスタムリソースとして簡単に適用できるようにするとか応用の幅は広そうです。

プロトタイプとテストをサクッと作れる言語で書いて、あとからリファクタリングしたり別の言語に書き換えることもできるので、ユースケースに合わせて上手に使っていきたいです。

*1:Minikubeのデフォルトではリソースが足りないせいでIstioのコントロールプレーンが起動せず、また手元の環境(i7/16GB RAM)ではIstio推奨のリソースを動かすのは辛かったです

*2:https://kubernetes.io/docs/tasks/tls/managing-tls-in-a-cluster/

*3:https://github.com/TatchNicolas/sample-admission-webhook/blob/master/istio-vsvc-host/rbac.yaml

ライフログを可視化してみたら偏食のようすがわかった - 飯田橋ランチマップ

JX通信社Advent Calendar 2019」11日目の記事です.

昨日は, @shinyoke さんの「PySparkはじめました - 分散処理デビューする前にやったこと」でした。 こんにちは. 同じくJX通信社でデータ基盤エンジニアをしています, @maplerと申します。

はじめに

今回はちょっと美味しい話をします。 昼時間になったらよくある話

「今日昼飯どこにいきますか?」
「わからない。。」

JX通信社オフィスがある飯田橋周辺美味しい店たくさんありまして、どこでランチを食べればいいのかわからない。

ちょうど2年前、Moves App というライフログアプリを一年半ほど利用してたので、そのデータを利用して自分の飯田橋ランチマップを作ってみようと思います。

やったこと

  • GeoPandas と GeoPy で位置情報の解析
  • Mapbox + Plotly で位置情報の可視化
  • Scikit-Learn で位置データのクラスタリング
  • Google Place API で位置情報から地点を取得する

目次

Moves App とは

スマホのGPS位置情報を記録して、自分がどこに滞在してたか、その間の移動方法(徒歩、走る、自転車、車、電車など)をロギングするアプリです。(ちなみに、2018年7月末にサービス終了)

データを取得

Moves App がサービス終了時とともに、公式サイトから過去のトラックデータをダウンロードすることができます。行動データは JSON で格納されてます。 Moves のデータは四種類があります。

  • activites: 移動中の行動タイプ(電車、歩行、自転車など)、カロリー、距離、時間などの情報。(Line data)
  • places: 行動中の滞在地点、滞在時間などの情報。(Point data)
  • storyline: activites と places を含め、さらに移動ルートの情報もあります。
  • summary: 一日単位の行動、距離、カロリーなどのまとめ情報

今回はランチの店だけ絞りたいので、places 情報のみ扱うことにします。

places データ

まずどんなデータかを見ましょう:

// ./json/yearly/places/places_2017.json
[
  ...
  {
    "date": "20170327",
    "segments": [
      {
        "type": "place",
        "startTime": "20170326T201246+0900",
        "endTime": "20170327T095023+0900",
        "place": {
          "id": 123456,
          "name": "飯田橋駅",
          "type": "home",
          "location": {  // 飯田橋駅
            "lat": 35.702084,
            "lon": 139.745023
          }
        },
        "lastUpdate": "20170327T014641Z"
      }
  ...
    ],
    "lastUpdate": "20170328T020321Z"
  },
  {
    "date": "20170328",
    "segments": [
      ...
    ]
    ...
  }
    ...
]

日別に segments という項目に時間順でその日行った場所が格納されている。

segment の構造

  • type: activity データでは値は place or move 2種類ありますが、places データには place のみになります
  • startTime: その地点に着いた時間
  • endTime: その地点から離れた時間
  • place: 地点の情報
    • id: 地点 id
    • name: 地点名。アプリが自動判定してくれますが、手動で地点名修正ができます。自動判定と手動修正してない場合、不明で null になる
    • type: アプリに定義したカテゴリ?
      • home: 家
      • work: 仕事場
      • facebook: 地点名推測情報源?
      • facebookPlaceId: facebook の地点 Id
      • user: 手動で地点名を修正した場合
    • location: 経度緯度の Geo 情報
  • lastUpdate: segment の最後更新時間

太字でマークした項目は今回扱うデータになります

データをロード

Pandas でデータをロード

import pandas as pd
place_df = pd.read_json('./moves/moves_export/json/yearly/places/places_2017.json', orient="records")
place_df = pd.concat([place_df, pd.read_json('./moves/moves_export/json/yearly/places/places_2018.json', orient="records")])
place_df.head()
date segments lastUpdate
0 20170110 [{'type': 'place', 'startTime': '20170110T0104... 20170117T055308Z
1 20170111 [{'type': 'place', 'startTime': '20170110T2136... 20170112T022810Z
2 20170112 [{'type': 'place', 'startTime': '20170111T2035... 20170113T163025Z
3 20170113 [{'type': 'place', 'startTime': '20170112T2107... 20170114T000927Z
4 20170114 [{'type': 'place', 'startTime': '20170113T1939... 20170114T093316Z

直接 json ロードだと segments がうまく展開できない。

json_normalize を使ってリストのカラムを展開

json_normalize という便利な関数でカラムを展開 (flat) します。

import json
with open('./moves/moves_export/json/yearly/places/places_2017.json', 'r') as f:
  d_2017 = json.load(f)
  place_segment_df_2017 = json_normalize(d_2017, record_path='segments')
with open('./moves/moves_export/json/yearly/places/places_2018.json', 'r') as f:
  d_2018 = json.load(f)
  place_segment_df_2018 = json_normalize(d_2018, record_path='segments')
# concat two DataFrame
place_segment_df = pd.concat([place_segment_df_2017, place_segment_df_2018])

record_path を segments カラムを指定して展開した結果

place_segment_df.head()
type startTime endTime lastUpdate place.id place.name place.type place.location.lat place.location.lon place.facebookPlaceId
0 place 20170110T010439+0900 20170110T094051+0900 20170110T013658Z 396353386 home ******* ******* NaN
1 place 20170110T123747+0900 20170110T125939+0900 20170117T055308Z 396514412 イタリアン酒場P facebook 35.699011 139.748671 218228871551007
2 place 20170110T130209+0900 20170110T195846+0900 20170110T113633Z 396471967 JX通信社 work 35.700228 139.747905 NaN
3 place 20170110T202923+0900 20170110T203906+0900 20170110T130729Z 396353386 home ******* ******* NaN
4 place 20170110T204418+0900 20170110T212233+0900 20170110T160517Z 396651896 エニタイムズ user 35.704518 139.728673

※ 家の GEO情報を非表示させていただきます

データの全体像

>> place_segment_df.info()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 4542 entries, 0 to 1658
Data columns (total 10 columns):
type                     4542 non-null object
startTime                4542 non-null object
endTime                  4542 non-null object
lastUpdate               4542 non-null object
place.id                 4542 non-null int64
place.name               2255 non-null object
place.type               4542 non-null object
place.location.lat       4542 non-null float64
place.location.lon       4542 non-null float64
place.facebookPlaceId    477 non-null object
dtypes: float64(2), int64(1), object(7)
memory usage: 390.3+ KB

こうして 1.5 年分ぐらいの私が行った場所が一つの DataFrame にまとめます

データを絞る (前処理)

まずはこの 4542 行のデータの中から 平日 仕事場ランチ を絞りたい

前処理

不要なカラムを消す

place_segment_df = place_segment_df[['startTime', 'endTime', 'place.name', 'place.type', 'place.location.lat', 'place.location.lon']]

時間カラムを datetime object に変換

place_segment_df.startTime = pd.to_datetime(place_segment_df.startTime)
place_segment_df.endTime = pd.to_datetime(place_segment_df.endTime)

平日を絞る

weekday_place_df = place_segment_df[place_segment_df.apply(lambda row: row.startTime.weekday() < 5 and row.endTime.weekday() < 5, axis=1)]

ランチ時間を絞る

時間を絞る
# filter lunch time
from datetime import timedelta

LUNCH_START_FROM = 11  # earliest lunch start time
LUNCH_END_FROM = 15  # latest lunch end time
MAX_LUNCH_DURATION = 3 * 60  # max lunch time 3 hours
MIN_LUNCH_DURATION = 5  # max lunch time  5 minutes

def is_lunch_time(start_dt, end_dt):
    if end_dt - start_dt > timedelta(minutes=MAX_LUNCH_DURATION):
        # can not more than max lunch time
        return False
    elif end_dt - start_dt > timedelta(minutes=MAX_LUNCH_DURATION):
        # can not less than min lunch time
        return False
    else:
        # check startTime and endTime are all between lunch time
        return start_dt.hour > LUNCH_START_FROM and start_dt.hour < LUNCH_END_FROM and end_dt.hour > LUNCH_START_FROM and end_dt.hour < LUNCH_END_FROM

lunch_time_place_df = weekday_place_df[weekday_place_df.apply(lambda row: is_lunch_time(row.startTime, row.endTime), axis=1)]
時間を可視化
# group by hour
hour_group = pd.DataFrame({"hour": place_segment_df.startTime.apply(lambda x: x.hour)}).groupby('hour')['hour'].count()
lunch_hour_group = pd.DataFrame({"hour": lunch_time_place_df.startTime.apply(lambda x: x.hour)}).groupby('hour')['hour'].count()

# use plot.ly
from plotly.subplots import make_subplots
import plotly.graph_objects as go

fig = make_subplots(rows=1, cols=2, subplot_titles=('full time', 'lunch time'))
fig.add_trace(go.Bar(x=hour_group.index, y=hour_group),row=1, col=1)
fig.add_trace(go.Bar(x=lunch_hour_group.index, y=lunch_hour_group), row=1, col=2)
fig.show(showlegend=False)

f:id:maplerme:20191211035357p:plain

主に 13 時前後にお昼ごはんを食べていくことが多いようです。

仕事場を絞る

位置情報扱いやすいため geopygeopandas を使い

geopandas で DataFrame を GeoDataFrame に変換

place.location.latplace.location.lon を代入して GeoDataFrame に変換

import geopandas
gdf = geopandas.GeoDataFrame(lunch_time_place_df, geometry=geopandas.points_from_xy(lunch_time_place_df['place.location.lon'], lunch_time_place_df['place.location.lat']))

geometry というカラムが作られます

gdf.head()
startTime endTime place.name place.type place.location.lat place.location.lon geometry
1 2017-01-10 12:37:47+09:00 2017-01-10 12:59:39+09:00 イタリアン酒場P facebook 35.699011 139.748671 POINT (139.74867 35.69901)
10 2017-01-11 13:25:15+09:00 2017-01-11 13:45:35+09:00 つけ麺 つじ田 facebook 35.701441 139.746628 POINT (139.74663 35.70144)
17 2017-01-12 12:35:05+09:00 2017-01-12 13:11:11+09:00 かつ村 facebook 35.698636 139.753765 POINT (139.75376 35.69864)
24 2017-01-13 13:09:11+09:00 2017-01-13 13:26:29+09:00 ヤミツキカリー facebook 35.699180 139.744961 POINT (139.74496 35.69918)

↑ ちなみにランチ時間を絞りでいい感じにランチの地点を絞ってくれました

geopy で飯田橋周辺 3km を絞ります

geopy の distance モジュールに geodesic という測地学的な距離を計算関数があります。

from shapely.geometry import Point
from geopy.distance import geodesic

def _distance(point_a_y, point_a_x, point_b_y, point_b_x):
  return geodesic((point_a_y, point_a_x), (point_b_y, point_b_x)).meters

def distance_for_point(point_a, point_b):
  """
  geodesic distance of shapely.geometry.Point object
  """
  return geodesic((point_a.y, point_a.x), (point_b.y, point_b.x)).meters

# 3km from iidabashi station
DIAMETER = 3 * 1000
IIDABASHI_STATION_POINT = Point(139.745023, 35.702084)

def near_iidabashi(point):
  return distance_for_point(point, IIDABASHI_STATION_POINT) < DIAMETER

gdf['is_near_iidabashi'] = gdf.geometry.apply(near_iidabashi)

プライベート情報を絞る(小声 🤫)

家がオフィスに近いので仕事場絞っても家は範囲内に入ってます、たまにランチ帰宅することもあります、ここ place.type を利用して非表示させていただきます。

>> (gdf['place.name'] == 'home').value_counts()
False    501
True       4
Name: place.name, dtype: int64
>> gdf = gdf[gdf['place.type'] != 'home']

(4回帰宅してたようです🏠)

結果を可視化

今回もっともやりたかったことです!

可視化前に

便利上、滞在時間(分)の duration カラムを追加
# add duration (unit: minute) column
weekday_place_df['duration'] = weekday_place_df.apply(lambda row: (row.endTime - row.startTime).seconds//60, axis=1)
場所名を coding した name_id カラムを追加
gdf['name_id'] = gdf['place.name'].astype('category').cat.codes

Mapbox と Plotly を使って地図上可視化

Mapbox を利用するため、Mapbox の開発アカウントを登録した上、token を取得する必要があります。

詳細は Mapbox の公式ドキュメント を参照してください。

※ Mapbox は従量課金制で、Web 上の地図ロードは毎月 50,000 回まで無料です。詳細はこちら

店名(name_id)を色付けにして地図上に plot

import plotly.graph_objects as go

token = 'your mapbox token here'

fig = go.Figure(go.Scattermapbox(
    mode = "markers+text",
    lat=gdf.geometry.y, 
    lon=gdf.geometry.x, 
    text=gdf['place.name'].apply(lambda x: x if isinstance(x, str) else ''),
    textposition = "bottom center",
    hoverinfo='name+text',
    hovertext=gdf.apply(lambda row: f"{row['place.name']} ({row['duration']}分)", axis=1),
    marker=go.scattermapbox.Marker(
        color=gdf['name_id'], colorscale='RdBu',
        size=10, sizemode='area'),
))

fig.update_layout(
    hovermode='closest',
    mapbox=go.layout.Mapbox(
        accesstoken=token, bearing=0,
        center=go.layout.mapbox.Center(
            lat=IIDABASHI_STATION_POINT.y,
            lon=IIDABASHI_STATION_POINT.x
            ), pitch=0, zoom=15
        ),
      margin=dict(l=10, r=10, t=10, b=10)
)

fig.show()

f:id:maplerme:20191211165458p:plain

ランチマップがいい感じに出てきました。

(みずほ銀行も混在してたが、とりあえず無視)

ちなみに当時名前付けてなかった店がまだ多い

>> gdf['place.name'].isnull().value_counts()
False    279
True     146
Name: place.name, dtype: int64

無名地点は倍ぐらいあります。

無名地点の名前つけ

Geo 情報から Google Place API で名前付けを考えてますが、精度を上げるため、同じ場所多数の地点をまとめてから実行しようと思います。

Scikit-learn で Clustering

Scikit-learn に Clustering のアルゴリズムがたくさん用意してますが、今回はシンプルで距離で Clustering したいので、DBSCAN を利用します。

DBSCAN

DBSCAN (Density-based spatial clustering of applications with noise ) は、1996 年に Martin Ester, Hans-Peter Kriegel, Jörg Sander および Xiaowei Xu によって提案されたデータクラスタリングアルゴリズムである。[1]これは密度準拠クラスタリング(英語版)アルゴリズムである。ある空間に点集合が与えられたとき、互いに密接にきっちり詰まっている点をグループにまとめ(多くの隣接点を持つ点、en:Fixed-radius_near_neighbors)、低密度領域にある点(その最近接点が遠すぎる点)を外れ値とする。

from Wikipedia - DBSCAN

高密度なクラスタリングとていう点は今回の要件と合います。

dbscan

※ DBSCAN の例図(出典

距離関数を指定して cluster を作成

上で定義した geodesic 距離関数を DBSCAN の metric 関数 として利用

5メートル範囲内に cluster を作成

from sklearn.cluster import DBSCAN

def distance_for_matrix(point_a, point_b):
  """
  geodesic distance of matrix
  """
  return _distance(point_a[0], point_a[1], point_b[0], point_b[1])

coords = gdf.as_matrix(columns=['place.location.lat', 'place.location.lon'])  # transform lat, lon to array
MIN_DISTANCE = 5  # cluster min distance as 5 meters
db = DBSCAN(
    eps=MIN_DISTANCE,  # min distance
    min_samples=1,  # min cluster samples, cluster will be -1 if under min_samples
    metric=distance_for_matrix
).fit(coords)

GeoDataFrame に代入

>> gdf['cluster'] = db.labels_
>> gdf['cluster'].value_counts()
6     50
4     29
24    27
5     27
12    25
      ..
40     1
38     1
37     1
36     1
39     1
Name: cluster, Length: 80, dtype: int64

39 クラスタ(地点)をまとまりました!

Cluster の情報を group by でまとめる
place_group = gdf.groupby('cluster')
grouped_df = pd.DataFrame({
  'place_names': place_group['place.name'].apply(lambda x: list(set([x for x in x.tolist() if isinstance(x, str)]))),  # list of place name
  'lat_mean': place_group['place.location.lat'].mean(),  # mean of latitude
  'lon_mean': place_group['place.location.lon'].mean(),  # mean of longitude
  'duration': place_group['duration'].mean(),  # mean of duration
  'times': place_group['cluster'].count(),  # visit times
})
  • place_name: アプリで判定した地点名を set unique してリスト化
  • lat_mean: 緯度の平均値
  • lon_mean: 経度の平均値
  • duration: 滞在時間の平均値
  • times: 訪問回数
テーブル整形
# reshape dataframe
grouped_df.reset_index(inplace = True)

もう一回地図上で plot

group を色付けして

fig = go.Figure(go.Scattermapbox(
    mode = "markers+text",
    lat=gdf["place.location.lat"], 
    lon=gdf["place.location.lon"], 
    text=gdf['place.name'].apply(lambda x: x if isinstance(x, str) else ''),
    textposition = "bottom center",
    hoverinfo='name+text',
    hovertext=gdf.apply(lambda row: f"{row['place.name']} ({row['duration']}分, {row['cluster']})", axis=1),
    marker=go.scattermapbox.Marker(
        color=gdf['cluster'], colorscale='RdBu',  # colored by cluster
        size=10, sizemode='area')
    )
)

f:id:maplerme:20191211165653p:plain

各地点毎にいい感じに色付けしました。

Google Place API で名前つけ

クラスタリングできので、未知の地点は Google Place API を利用して、まとめて取ります。

Google MAP の Python SDK を使います。Mapbox と同じ、Google Cloud Platform の開発アカウントを登録した上、 API KEY を取得する必要があります。

詳細は Google Cloud の公式ドキュメント を参照してください。

※ Place API も同じ従量課金制(Pay-As-You-Go Pricingで、毎月 100,000 回 request まで無料です。詳細はこちら

import googlemaps

gmaps = googlemaps.Client(key='your google api key')

def get_nearby_restaurants(lat, lon, count=5):
  """
  return first `count` of name of place near lat/lon
  """
  ret = gmaps.places_nearby(
    (lat, lon),
    radius=30,
    language='ja', 
    type='restaurant',  # search only restaurant 
    rank_by='prominence'  # sorted by google rank
  )
  return [r['name'] for r in ret['results'] if r['types'][0] == 'restaurant'][:count]

places_nearby method を使って、30メートル範囲(GPS誤差を考える)内のレストランを検索して、結果の中の name をリスト化して返す。

上の未知の地点を試してみる

>> get_nearby_restaurants(35.700658, 139.741140)
['Enoteca Vita', 'アズーリ 神楽坂', '竹子', 'てんたけ', '吾']

Enoteca Vita に行ったことがないが、確かにアズーリ はめっちゃ行ってました。

GPS の誤差があるのでしょうがないことです。

Cluster の DataFrame に応用
grouped_df['predict_locations'] = grouped_df.apply(lambda row: get_nearby_restaurants(row.lat_mean, row.lon_mean), axis=1)
店名をまとめる

もともとアプリであった、place_names と Google Place API で予測した predict_locations をまとめて name という項目にまとめます。

grouped_df['name'] = grouped_df.apply(
  lambda row: '/'.join(row['place_names'][:2]) if row['place_names'] else '/'.join(row['predict_locations'][:2]), axis=1
)

おれおれの飯田橋ランチマップ

データを全部揃ってきたので、いよいよ自分のランチはどんなっているのかを掲示していきます。

まずひとつの DataFrame にまとめます。

my_lunch_map_df = grouped_df[['name', 'lat_mean', 'lon_mean', 'duration', 'times', 'predict_locations']]

可視化したランチマップ最終形態

訪問回数を点の半径 size にして、店名を付けるようにしました

fig = go.Figure(go.Scattermapbox(
    mode = "markers+text",
    lat=my_lunch_map_df["lat_mean"], 
    lon=my_lunch_map_df["lon_mean"], 
    text=my_lunch_map_df['name'],
    textposition = "bottom center",
    hoverinfo='name+text',
    hovertext=my_lunch_map_df.apply(lambda row: f"{row['name']} ({row['duration']}分)" if row['times'] > 1 else '', axis=1),
    marker=go.scattermapbox.Marker(
        color=my_lunch_map_df.index, 
        size=my_lunch_map_df['times'],
        sizeref=0.2,
        sizemode='area',
        colorscale='RdBu',
        ),
    )
)

f:id:maplerme:20191211165728p:plain

※ (こちらはスクリーンショットだけですが、実際 Notebook 上は Mapbox で拡大で、かぶるところの店名も表示されます)

1. 回数をソートしてわかること - 偏食?

# sort by times
my_lunch_map_df.sort_values('times', ascending=False).head(10)
name lat_mean lon_mean duration times predict_locations
0 刀削麺 火鍋 XI'AN 35.698675 139.745555 26.720000 50 [芊品香 別館, うなぎ川勢, X'IAN飯田橋, 居酒屋・秋刀魚, 鳴門鯛焼本舗 飯田橋駅前店]
1 ヤミツキカリー 35.699180 139.744961 17.793103 29 [魂心家 飯田橋, 東京餃子 あかり 飯田橋, 鳥貴族 飯田橋西口店, Yamitukiカリ...
2 JX通信社 35.700228 139.747905 30.555556 27 [大阪王将 飯田橋店, 蕎庵 卯のや, BAR de Cava]
3 たい料理 ロッディー 35.700607 139.745326 21.703704 27 [太郎坊, ロッディー]
4 Enoteca Vita/アズーリ 神楽坂 35.700658 139.741140 39.600000 25 [Enoteca Vita, アズーリ 神楽坂, 竹子, てんたけ, 吾]
5 イタリアン酒場P/エニタイムズ 35.699011 139.748671 27.208333 24 [イタリア料理 LUCE ルーチェ, イタリア酒場P, やきとり あそび邸 飯田橋, 美食処...
6 天こう餃子房 35.701035 139.746305 25.136364 22 [築地食堂源ちゃん 飯田橋店, テング酒場 飯田橋東口店, 土間土間 飯田橋東口店, 牛角 ...
7 鳥酎 飯田橋 35.700951 139.746536 26.000000 21 [テング酒場 飯田橋東口店, 土間土間 飯田橋東口店, ル・ジャングレ, 牛角 飯田橋東口店...
8 牧の家/旬味福でん 35.699635 139.746270 19.250000 20 [牧の家, 旬味福でん, 島]
9 アジアンダイニング puja 35.699137 139.748516 25.111111 18 [イタリア料理 LUCE ルーチェ, 時代寿司, イタリア酒場P, やきとり あそび邸 飯田...
  1. 刀削麺めっちゃ食べてる?!

    • predict_locations を見ればわかるが、この地点では「芊品香 別館」が混在しているので、両方を合わせて 50 回ぐらい行った感じです。どっちも本場中華で、毎週絶対一回行きます。

    • XI`AN の麻辣刀削麺芊品香の地獄辛麻婆豆腐どっちが強いのかな :thinking:

    • ちなみに会社の Slack に 芊品香 のスタンプもあります
  2. 2位であるタイ風カレーの ヤミツキカリー もある時期毎日行ってた記憶があります。

  3. JX通信社オフィスが3位!ある時期自炊して弁当を作ることがあったので、会社で食べることがわりと多かった(間違えってまとめてた可能性もあります)

  4. 8 位 predict_locations にあるという沖縄料理にあたると思います

    偏食諸説?

    総合でみると、中華 (刀削麺 + 餃子房) 72 回、タイ + インド 74 回というのが年中半分以上の昼飯はこの2種類になってますので、偏食している自分を見つかったかもしれない?

2. 滞在時間をソートしわかること - シャッフルランチ制度

滞在時間長いが、ランチではない場所も出てきたので、一回レストラン名が取れなかったものをフィルタリングします

# sort by duration
my_lunch_map_df[my_lunch_map_df['name'].apply(lambda x: bool(x))].sort_values('duration', ascending=False).head(10)
name lat_mean lon_mean duration times predict_locations
46 別亭 鳥茶屋/メゾン・ド・ラ・ブルゴーニュ 35.700753 139.74074 73.000000 1 [別亭 鳥茶屋, メゾン・ド・ラ・ブルゴーニュ, アズーリ 神楽坂, 個室割烹 神楽坂 梅助...
65 神楽坂イタリアン400 クワトロチェント/天空のイタリアン Casa Valeria(カーサ... 35.701537 139.74016 48.000000 1 [神楽坂イタリアン400 クワトロチェント, 天空のイタリアン Casa Valeria(カ...
44 九頭龍蕎麦 はなれ/リストランテ クロディーノ 神楽坂 35.701844 139.73892 48.000000 1 [九頭龍蕎麦 はなれ, リストランテ クロディーノ 神楽坂, 上海ピーマン, 神楽坂 新泉,...
53 浅野屋/Sガスト 神田神保町店 35.695391 139.75954 45.000000 1 [浅野屋, Sガスト 神田神保町店, 本格四川料理 刀削麺 川府 神保町店, 築地食堂源ちゃ...
21 神楽坂魚金/神楽坂 芝蘭 35.702051 139.74132 40.666667 3 [神楽坂魚金, 神楽坂 芝蘭, 天孝, 翔山亭 黒毛和牛贅沢重専門店 神楽坂本店]
4 Enoteca Vita/アズーリ 神楽坂 35.700658 139.74114 39.600000 25 [Enoteca Vita, アズーリ 神楽坂, 竹子, てんたけ, 吾, ますだや, 다케...
32 卸)神保町食肉センター 本店/神保町 kururi 35.697251 139.75759 39.500000 2 [卸)神保町食肉センター 本店, 神保町 kururi]
67 筋肉食堂 水道橋店/札幌らーめん 品川甚作本店 35.701428 139.75370 39.000000 1 [筋肉食堂 水道橋店, 札幌らーめん 品川甚作本店, 立ち呑み海鮮 魚升, 御麺屋 水道橋店...
11 芊品香/和彩 かくや 35.699369 139.75014 38.500000 14 [芊品香, 和彩 かくや]
45 BISTROマルニ/源来酒家 35.695853 139.75458 37.000000 1 [BISTROマルニ, 源来酒家, CHICKEN CREW チキンクルー 神保町, 十勝ハ...

上位に神楽坂の料亭が多く並んているようです。

ここは JX通信社のシャッフルランチという制度があります。1人あたり1,000円を会社から支給で、オフィス周辺の美味しい店でゆっくりランチを食べれます。

www.wantedly.com

神楽坂4年住まいの自分もこの機会しか神楽坂の高級レストランに行けなかった。

ごちそうさまでした!

終わりに

今回は位置情報データをいろいろ変換と可視化をやってみました。 2年ほど寝かしたデータですが、可視化してみると意外と面白かった。 本当に偏食ではないですが、頭の中に思ってたアイディア具体化できるのがいいなと感じました。

明日の「JX通信社Advent Calendar 2019」は, @andmohikoさんです.

参考文献

PySparkはじめました - 分散処理デビューする前にやったこと

JX通信社Advent Calendar 2019」10日目の記事です.

昨日は, @rychhrさんの「Pure WebSocketsをサポートしたAWS AppSyncでWebとiOS間のリアルタイムチャットを作ってみた(1)」でした.

改めまして, こんにちは. JX通信社でシニア・エンジニア&データ基盤エンジニアをしています, @shinyorke(しんよーく)と申します.

JX通信社では, データ駆動での意思決定および施策実施をより円滑に進めるため, データ基盤の構築・運用を進めながらトライアル的に様々なFrameworkやツールの検証を行っています.*1

このエントリーでは,

私がシュッとPySparkで分散処理をする...前に, 手元で試したときの感想とその知見

のお話を残していきたいと思います.

なお, 分散処理そのものの知見・ノウハウではなく, する前にPySparkに慣れておこう!っていう話です.

※分散処理に関する記述はSparkの説明で少しだけやります :bow:

TL;DR

  • PandasとSQLを使えればPySparkは使えそう&書いてて良い感じがする.
  • 環境構築と動くまでが鬼門なので, 自前ホスティングはやめた方が良い, ベスプラは「Cloud系サービス使う」こと(AWS Glue, GCP Cloud Dataprocなど).
  • 利用シーンを明確にした上で使ったほうが幸せ. Pandas他で済むパターンも有る.

この記事を読み終える頃にはきっとPySparkでシュッと何かを試したくなるハズです.

スタメン

やりたかったこと・やったこと

絵に書くとこういう感じです.

f:id:shinyorke:20191203204740p:plain
実践に向けてのキャッチアップでした

そもそも最初の構想としては,

  • プロダクトが出力するログを, 収集 -> クレンジング&クラスタリング -> BigQueryとかのDWHで使えるように出力するETLを作りたい
  • 収集からクラスタリングまでAWS Glue使えるのでは?
  • AWS Glue使うんだったらSpark理解しよう

という所からはじまりました(左側の絵).

AWS Glueのチュートリアルをやりつつも,

ワイ「そもそもGlue・Sparkわからん!」

となったので,

  • ひとまず手元の環境(Macbook Pro)にSparkクラスタをシュッと立てて
  • 手慣れたデータセットを使ってSparkになれてみよう
  • この構成ならSparkとPythonあればイケるのでは!?

と思いつき、エイヤッとやってみました(ってのが右側の絵です).

PySpark #とは

そもそもSparkって何?という方もいると思うので雑に説明すると,

  • 一言でいうと, 大量のデータを分散処理するためのFramework
  • 内部的にはクラスタリングされた「RDD(Resilient Distributed Dataset)」というデータセットで分散処理を実現
  • プログラマブルにデータを触ったりイジイジするためのDataFrameやSpark SQLといったインターフェースを使う*2
  • ETLな処理を実現するためのWorkflow, 機械学習を実装するためのライブラリ(MLlib)が実装されている*3

もので, 今どきの分散処理では割と有力な選択肢となっています.

Spark本体はScala(Java)で開発されており, これらの言語から使うこともできますが,

Pythonベースのラッパーである, PySparkを使って扱うのがベスト・プラクティスとなっています*4*5.

手元で試しに使ってみる

というわけで早速お試ししました.

実際は業務上のデータでお試ししたのですが, 流石にブログでお披露目できるようなものではない(察し)ので,

  • オープンデータで皆が手に入れることができて
  • ある程度きれいなフォーマットのデータで
  • すでにPythonや他の言語で分析・解析した実績がある

ようなデータセットとお題でサンプルをご用意いたしました.

そう, 個人的に一番得意な「野球データ」です.*6

shinyorke.hatenablog.com

上記のエントリー(私の個人ワークです)で作成した,

  • とある歴史上の選手の成長モデルを線形回帰でいい感じにやってみる*7
  • 線形回帰モデルは, 目的変数をOPS(On Base Plus Slugging), 説明変数を年齢になにかしたもの*8
  • 上記をPandas + scikit-learn + bokehで再現*9

を, 今度はPySparkで作り変えました(書き換えまでの所要時間:3時間34分*10).

PySpark版のコードと, Python版コードをぜひ比較しながらこの先お楽しみください.

環境を作る・動かす

検証環境自体は(Sparkの環境作りがどれほど大変かを試す意味も含めて)自分のMacbook Pro上でやりました.

brewでSparkをインストールした後,

 $ brew install apache-spark

pythonのvenvで仮想環境を設定(プロジェクト直下の.venvに作成), direnvで

source .venv/bin/activate
source .env

を設定後, プロジェクト配下の.envに

export PYSPARK_PYTHON=/Users/shinyorke/sample/.venv/bin/python
export PYSPARK_DRIVER_PYTHON=/Users/shinyorke/sample/.venv/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="'notebook' pyspark"

これで準備を行い,

 $ pysprk

コマンドで起動するようになりました.

なお, コードをcloneして自前でビルドしてやる方法もありますが,

  • ある程度JavaとMavenの経験と知識が必要*11
  • 何よりも時間がメッチャかかる*12

事もあり, 積極的にはオススメしません.*13

もしお試ししたい時は,

といった形で課金して人類の叡智に乗っかりましょう.

PySparkとPythonで比較してみた

というわけで実際に比較してみました.

同じようなことをやってるコード同士を抜粋して比較します.

データの読み込み

普通のPython版ではPandasのDataFrameを, PySpark版ではSparkのDataFrameを使いました.

なお, どちらも行列データを扱う・扱うためのメソッドがたくさんついてくる感じですが, PySparkは内部的に分散してデータ*14を保持していること, SQLライクに操れる(後ほど紹介)という違いがあります.

というわけで, CSVからDataFrameを取ってくる処理で比較してみましょう.

Pandas版(抜粋)

お馴染みすぎる処理なのであんまり説明がいらないかも笑.

import csv
import pandas as pd
pd.options.display.max_columns = 100

# CSVからDataFrame
df_batting = pd.read_csv("/Users/shinyorke/github/baseballdatabank/core/Batting.csv")

# nanはすべて0埋め
df_batting.fillna(0.0, inplace=True)

PySpark版(抜粋)

PySparkで同じことをやると

# データ
from pyspark.sql.types import *

# 後ほどDataFrameのデータ型を指定するため使う
STATS_COLUMNS = ['G', 'AB', 'R', 'H', '2B', '3B', 'HR', 'RBI', 'SB', 'CS', 'BB', 'SO', 'IBB', 'HBP', 'SH', 'SF', 'GIDP']

# 打撃成績Schemaのデータ型
schema_batting = StructType(
    [
        StructField('playerID', StringType(), False),
        StructField('yearID', IntegerType(), False),
        StructField('stint', IntegerType(), False),
        StructField('teamID', StringType(), False),
        StructField('lgID', StringType(), False),
        StructField('G', IntegerType(), True),
        StructField('AB', IntegerType(), True),
        StructField('R', IntegerType(), True),
        StructField('H', IntegerType(), True),
        StructField('2B', IntegerType(), True),
        StructField('3B', IntegerType(), True),
        StructField('HR', IntegerType(), True),
        StructField('RBI', IntegerType(), True),
        StructField('SB', IntegerType(), True),
        StructField('CS', IntegerType(), True),
        StructField('BB', IntegerType(), True),
        StructField('SO', IntegerType(), True),
        StructField('IBB', IntegerType(), True),
        StructField('HBP', IntegerType(), True),
        StructField('SH', IntegerType(), True),
        StructField('SF', IntegerType(), True),
        StructField('GIDP', IntegerType(), True),
    ]
)

# RDDをCSVから作る(引数は並列数っぽい)
rdd_batting = sc.textFile('../../../baseballdatabank/core/Batting.csv', 4)


# RDDをDataFrameに変換
batting = spark.read.option("header","true").format("csv").schema(schema_batting).csv(rdd_batting)
batting = batting.fillna(0, subset=STATS_COLUMNS)

Schema作るあたりからJavaな風味が漂ってきますね(小並感).

SparkはRDDがすべての基本なので, RDDを作った後にDataFrameを作ることになります.

Pandasの場合と異なり, 暗黙的に型を指定する動きはしないため,

  • RDD -> DataFrame変換時にフォーマット(CSV)を指定
  • RDDはスキーマを持たないデータなので, DataFrameのSchemaを別に指定

という順序を踏むことになります.

加工(前処理としてOPSを計算)

次に, 取得したデータからOPSを算出します.

OPSは「出塁率(OBP)に長打率(SLG)を足したもの」なので,

OBP = (H + BB + HBP) / (AB + BB + HBP + SF)*15

SLG = (HR * 4 + 3B * 3 + 2B * 2 + Single) / AB*16

OPS = OBP + SLG

という順でいい感じに計算します.

Pandas版(抜粋)

AB, Hなど複数のSeriesを使うので,

  • 計算用の関数を作る. 計算そのものは野球統計ライブラリ「sabr」を使う.*17
  • DataFrameに直接関数をapply

というアプローチでやりました.

# 打率, 出塁率, 長打率. 指標値計算にSABRを使う

import sabr

from sabr.stats import Stats

def obp(row: pd.Series) -> float:
    try:
        return Stats.obp(ab=row.AB, h=row.H, bb=row.BB, sf=row.SF, hbp=row.HBP)
    except ZeroDivisionError as e:
        return 0.0

def slg(row: pd.Series) -> float:
    try:
        return Stats.slg(ab=row.AB, tb=row.TB)
    except ZeroDivisionError as e:
        return 0.0

df_batting['OBP'] = df_batting.apply(obp, axis=1)
df_batting['SLG'] = df_batting.apply(slg, axis=1)
df_batting['OPS'] = df_batting['OBP'] + df_batting['SLG']

この辺はPandas使いの方はお馴染みの方法かな...*18

PySpark版

PySparkの場合はいくつかやり方があります.

  • Python(Pandas)式と同じく, DataFrame上から直接計算
  • Spark SQLで直接計算. 具体的にはselect文の中で直接四則演算する

ひとまずDataFrame式でやるとこうなります.

# udfでユーザー定義関数を定義し, カラムとしてはめていく
from pyspark.sql.functions import udf, col

# sabrは野球指標を計算するためのライブラリ
from sabr.stats import Stats

@udf(DoubleType())
def obp(ab, h, bb, sf, hbp):
    # 出塁率
    try:
        return Stats.obp(ab=ab, h=h, bb=bb, sf=sf, hbp=hbp)
    except ZeroDivisionError as e:
        return 0.0

@udf(DoubleType())
def slg(ab, h, _2b, _3b, hr):
    # 長打率
    try:
        _1b = h - (_2b + _3b + hr)
        tb = _1b * 1 + _2b * 2 + _3b * 3 + hr * 4
        return Stats.slg(ab=ab, tb=tb)
    except ZeroDivisionError as e:
        return 0.0

# DataFrameに打率, 出塁率, 長打率, OPS(出塁率 + 長打率)を追加
batting = batting.withColumn('OBP', obp('AB', 'H', 'BB', 'SF', 'HBP'))
batting = batting.withColumn('SLG', slg('AB', 'H', '2B', '3B', 'HR'))
batting = batting.withColumn('OPS', col('OBP') + col('SLG'))

やってる順序はPandas版と変わりませんが,

  • @udfデコレータで戻り値の型を指定(DoubleType()ってやつですね)
  • DataFrameのwithColumnメソッドでユーザー定義関数と使うカラムを指定

と, これも雰囲気にJava感出てきました.

なお, 参考までにSpark SQLでやる場合は,

# 出塁率と長打率, OPSを自前で計算
query = '''
    select 
    (b.H + b.BB + b.HBP) / (b.AB + b.BB + b.HBP + b.sf) as obp,
    (b.HR * 4 + b.3B * 3 + b.2B * 2 + (b.H - b.HR - b.3B - b.2B) * 1) / b.AB as slg,
    obp + slg as ops
    from batting as b join player as p on b.playerID = p.playerID 
    where p.nameFirst="{firstname}" and p.nameLast="{lastname}"
    order by b.yearID asc
'''
spark.sql(query.format(firstname='Mickey', lastname='Mantle'))

という感じに, SQLっぽくやることはできますが, プログラマブルっぽくないのであんまりオススメじゃないです.*19

学習・可視化

本当はSparkのMLlibを使ってやるつもりでした...が!

途中で挫折したため, 今回はどちらもscikit-learnでやりました :bow:

なお, SparkのDataFrameからPandasのDataFrameは一行で取得できます.

# 年齢, OPSおよび予測に必要なデータを返す関数
def batting_stats(firstname, lastname):
    query = '''
    select 
    b.yearID - p.birthYear - 1 as AGE,
    b.OPS as OPS,
    (b.yearID - p.birthYear - 1) -30 as age_30,
    pow((b.yearID - p.birthYear - 1) -30, 2) as age_30_2
    from batting as b join player as p on b.playerID = p.playerID 
    where p.nameFirst="{firstname}" and p.nameLast="{lastname}"
    order by b.yearID asc
    '''
    return spark.sql(query.format(firstname=firstname, lastname=lastname))

mantle = batting_stats("Mickey", "Mantle")

# scikit-learnで線形回帰するため、pandas Dataframeに変換
df_mantle = mantle.toPandas()

というわけで, こちらはPython版と変わらずですが, 次の機会にMLlibでやろうと思います.

なお, 可視化は結局Pandas DataFrameもしくはndarrayでやるっぽいので, Bokehなど好みのライブラリを使うと良さそうです.

【参考】Python版の可視化

というわけで, 完成したグラフはこちらです.

f:id:shinyorke:20191205225913p:plain
データ元はSparkです(他は変わらない)

結び

今回はデータを前処理するところまで, PySparkとPandasで比較を行いました.

それぞれに良いところ・しっくり行かないところがありますが, あえて言うなら,

  • 数GB以上のデータをそれなりの性能要件(例えばリアルタイムとストリームっぽくなにかやる)で処理する必要がある場合はSpark合ってるっぽい.
  • 通常の分析・解析業務, 特にアドホックに済むものはやっぱPandas強い.
  • とはいえ, PySparkはSparkの仕組みと分散処理のノウハウを把握してたらこれはこれで強い道具.

といったところかなと思います&野球はやっぱお試しのデータセットとして面白い*20.

次の機会がありましたら, タイムオーバーで試すことができなかったMLlibを続きでやりたいなと思います.

ここまでお読みいただきありがとうございました.

明日の「JX通信社Advent Calendar 2019」は, @maplerさんです.

【Appendix】参考資料

当エントリーの執筆および, Sparkのお試しでは以下の文献を参考にしました.

Pythonで大量データ処理!PySparkを用いたデータ処理と分析のきほん*21

入門PySpark

www.oreilly.co.jp

【Appendix】PySparkのサンプルコード(全体)

PySpark検証サンプル

*1:この件のみならず, スプリントの中で時間を区切って検証をしやすい環境ではありますし, これで技術選定・開発も進むので実にやりやすいです.

*2:ここでPandas風にDataFrameだったり, SQLで操れたりします. また, 元々がHadoopの後継的な立ち位置なのでRDDを使ってMap/Reduceという方法でも同じことができます.

*3:後で触れますが今回は間に合わずだったので触れていません. いつかやりたい.

*4:ただし, Sparkの機能をすべてPySparkから使えるわけではないです. 必要十分すぎる感はありますが.

*5:他にもRの実装もあるみたいです, 公式で.

*6:メジャーリーグだと複数のオープンデータセットがあり, Kaggleにも題材がある程度に揃っているかつ, データサイズが数百MB(GBいかない)ぐらいなのでこの手の検証には最適です, 本筋と関係ないですが笑.

*7:要するに時系列で分析しています, 内容知りたい方は引用したブログの方をご覧ください.

*8:OPSは簡単に言うと選手の得点能力を「量」として捉える数字で野球の世界では結構メジャーです, 成長モデルは雑に言うと「若いうちは伸びるけどある一定の年を取ったら衰えるはず」なので, 年齢が説明変数となります.

*9:ちなみに元のコードはRでした, RからPython乗り換えはここでは触れないので引用先をご覧ください.

*10:なんでや!は(ry ...はさておき, 実際のところ1日ちょいで置き換えできました.

*11:Javaで開発したことある人なら多分楽勝です, 自分もそれがあったのでイケましたが「Pythonしかやったことない」レベルだと苦痛だと思います.

*12:スペックによりますが, ビルドで2,3時間近くかかるはず.

*13:過去に個人の興味・趣味でやった際は, こちらのオライリー本の付録を元にやった&これで十分動きましたがまあ...ねえ.

*14:Sparkが分散処理Frameworkなので当然っちゃ当然かなと. 余談ですがPandasの分散処理はDaskほかいくつか存在します.

*15:H:安打数, BB:四球, HBP:死球, AB:打数, SF:犠牲フライ

*16:HR:本塁打, 3B:三塁打, 2B:二塁打, Single:単打, AB:打数. 本塁打から単打までを「塁打」として計算して打数で割ることにより算出します.

*17:ちなみに拙作です.

*18:今回ぐらいのデータ量ならapplyでやるのがベストですが, もっと大きいデータを扱う時はSeries間の変換コストがかなりかかるので, いい感じにmapで処理するなどした方が良いときもあります.

*19:とはいえ, AWS GlueやDatabricksみたいなサービスでアドホックにやるときはこれで良いかもです.

*20:真面目な話, 初めて触るもの・体験するものに備えて「馴れた」データセットを用意しておくのは色々小回り効くし何より対象物のキャッチアップに集中できるのでかなりオススメです.

*21:余談も余談ですが, 林田さんに勧められてSparkとPySparkを認知し, 使いはじめました(この発表された当時同僚でした)