Python multiprocessing vs threading vs asyncio

エンジニアの鈴木(泰)です。

今回は、multiprocessingとthreadingとasyncioの違いとはなんだろう?という問に挑戦してみたいと思います。

この問の答えをグーグル先生に聞いてみると、非常にたくさんの情報がヒットします。しかしながら、どの情報も断片的なものばかりで(本記事もそうなのかもしれません)、色々と本を読んだりネットを漁ったりして、情報を補完しなければなりませんでした。

本記事は、僕が調べた限りの情報を集約し、この問に対する結論を1つの記事にまとめたものとなっています。

前提

本題に入る前に、いつくかの前提について認識を合わせておきます。

マルチプロセスとは

プロセスとは実行中のプログラムです。例えば、Pythonのソースコードを実行すると、ソースコードをインタプリターがバイトコードにコンパイルします。OSはこのバイトコードを実行し、ソースコードに書かれている通りに処理を開始します。この実行中の処理をプロセスと呼びます。

1つのプロセスは、OSから空いているCPUコアが割り当てられることにより、処理を進めることができます。当然、CPUのコアが1つだけである場合、1つのプロセスの処理だけしか進めることができません。しかし、CPUのコアが複数ある場合、それぞれのコアを複数のプロセスに対して同時に割り当てることができるため、複数のプロセスの処理を同時に進めることができます。

マルチプロセスとは、複数のプロセスが同時に処理を進めることを指します。マルチプロセスのメリットは、1つのプログラムの目的を達成するために複数のCPUのコアを利用することで、より速く目的を達成できるという点にあります。

マルチプロセス機構はOS毎に実装が異なります。OS毎の挙動の違いに注意する必要はありますが、プログラミング言語毎の挙動の違いはあまりないです。とはいえ、各プログラミング言語において、プロセスの作成をOSに対して直接に命令することは少なく、各言語毎に用意されているラッパー関数やクラスを通して行います。従って、各言語毎に、これらのラッパーの仕様の違いを知っておく必要はあります。

マルチスレッドとは

スレッドとは、プロセスの中における処理の流れのことです。「処理の流れ」という表現では曖昧でわかりにくいため、具体例で説明します。

以下のPythonのソースコードを実行すると、プロセスが作られます。このプロセスの中では、Helloの出力から始まり、!の出力で終わる処理の流れがあります。この処理の流れがスレッドです。このスレッドをメインスレッドと呼びます。このソースコードでは、プロセスが開始されたから終わるまで、処理の流れはずっとメインスレッド1つだけです。

hello.py

print('Hello')
print('world')
print('!')

以下のPythonのソースコードはthreadingライブラリを利用したマルチスレッドを実行するものです。job.start()関数がスレッドを開始します。このソースコードではprint('Hello')print('world')print('!')、そしてメインスレッドの4つの処理の流れがあります。job.join()関数の実行後はスレッドが完了します。よって、print('done')が実行される時点においては、スレッドはメインスレッドの1つだけです。

hello_threading.py

import threading

jobs = []
jobs.append(threading.Thread(target=lambda : print('Hello')))
jobs.append(threading.Thread(target=lambda : print('world')))
jobs.append(threading.Thread(target=lambda : print('!')))

for job in jobs:
    job.start()
for job in jobs:
    job.join()

print('done')

1つのスレッドは、プログラミング言語毎に実装されている機構(LinuxではPthread、JavaのThreadsライブラリ、Pythonではasyncioやthreadingライブラリ等)を通してCPUコアが割り当てられることにより、処理を進めることができます。プロセスのように、OSから直接CPUコアが割り当てられるのではありません。プロセスの場合と同様に、CPUのコアが複数ある場合、それぞれのコアを複数のスレッドに対して同時に割り当てることができれば、複数のスレッドの処理を同時に進めることができます。

マルチスレッドとは、複数のスレッドが同時に処理を進めることを指します。

一般的には、マルチスレッドのメリットもマルチプロセスのメリットと同様です。が、Pythonにおいては、CPythonがGILであるということに注意する必要があります。

Pythonにおけるマルチスレッド

Pythonにおいて、マルチスレッドなソースコードを書く場合、CPythonがGILがあることを考慮しなければなりません。スクリプト言語のインタプリターは、GILであるものとそうでないものがあります。CPythonはGILであり、JythonやIronPythonはGILではありません。ちなみにCRubyはGILです。

GILであるインタプリターにおいては、マルチスレッドなソースコードを書いたとしても、インタプリターが出力したバイトコードをOS上で実行する段階においてマルチスレッドでは実行されません。たとえば、上で掲載したhello_threading.pyは、OS上で実行される段階においてマルチスレッドでは実行されません。

本題

Pythonにおいて、マルチプロセスやマルチスレッドなソースコードを書く場合、multiprocessing、threading、asyncioのどれを利用すべきなのでしょうか?

マルチプロセス(multiprocessingライブラリ)を利用したほうが良い場合

CPU負荷の高い処理(いわゆるCPU bound)を達成するためのソースコードである場合、multiprocessingを利用し、マルチプロセスに書きましょう(Jython等のGILではないインタプリターを使うのであれば、この限りではありません)。

CPU負荷の高い処理するためにマルチスレッドなソースコードを書いたとしても、パフォーマンスは改善されません。なぜなら、「Pythonにおけるマルチスレッド」で説明した通り、Pythonのソースコードはインタプリターによりコンパイルされた後、OS上でシングルスレッドで実行されるからです。すなわち、利用できるCPUコアは1つだけに限定されます。

実際にやってみると、パフォーマンスの差が顕著に表れます。

検証環境

  • 4 vCPUs, 16 GB memory
  • CentOS, 8, x86_64 built on 20210701
  • Python3.8

cpu_sec.py

CPU負荷の高い処理burden_cpu関数を1つのプロセス、1つのスレッドで処理するプログラムです。

def burden_cpu():
    for i in range(10000):
        for j in range(10000):
            pass

for i in range(4):
   burden_cpu()

実行結果

$ time python3.8 cpu_sec.py

real    0m9.518s
user    0m9.473s
sys 0m0.005s

CPU使用率。CPUのコアが4個あるうち、1つのコアだけを使用しているため、25%となります。

$ mpstat 1
...(省略)
16:08:49     CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
16:08:49     all   18.50    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   81.50
16:08:50     all   25.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   75.00
16:08:51     all   25.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   75.00
16:08:52     all   25.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   75.00
16:08:53     all   24.88    0.00    0.25    0.00    0.25    0.00    0.00    0.00    0.00   74.63
16:08:54     all   24.81    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   75.19
16:08:55     all   25.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   75.00
16:08:56     all   24.75    0.00    0.00    0.00    0.50    0.00    0.00    0.00    0.00   74.75
16:08:57     all   25.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   75.00
...(省略)

cpu_multiprocessing.py

CPU負荷の高い処理burden_cpu関数を4つのプロセス、各プロセス上では1つのスレッドで処理するプログラムです。

import multiprocessing as mp

def burden_cpu(_: any):
    for i in range(10000):
        for j in range(10000):
            pass

pool = mp.Pool(4)
pool.map(burden_cpu, [i for i in range(4)])
pool.close()

実行結果。CPUを効率良く使用できている(下記参照)ため、cpu_sec.pyの実行時間よりも小さくなります。

$ time python3.8 cpu_multiprocessing.py

real    0m5.351s
user    0m21.062s
sys 0m0.028s

CPU使用率。CPUのコアが4個あるうち、4つプロセスに対して1つずつコアが割り当てられ、同時に4つのコアを使用しているためほぼ100%となります。

$ mpstat 1
...(省略)
16:12:08     all   99.50    0.00    0.00    0.00    0.50    0.00    0.00    0.00    0.00    0.00
16:12:09     all   99.01    0.00    0.00    0.00    0.99    0.00    0.00    0.00    0.00    0.00
16:12:10     all   99.75    0.00    0.00    0.00    0.25    0.00    0.00    0.00    0.00    0.00
16:12:11     all   99.25    0.00    0.00    0.00    0.75    0.00    0.00    0.00    0.00    0.00
...(省略)

cpu_threading.py

CPU負荷の高い処理burden_cpu関数を1つのプロセス、4つのスレッドで処理するプログラムです。

from concurrent.futures import ThreadPoolExecutor

def burden_cpu():
    for i in range(10000):
        for j in range(10000):
            pass

pool = ThreadPoolExecutor(max_workers=4)

for i in range(4):
    pool.submit(burden_cpu)
pool.shutdown()

実行結果。ソースコード上では4つのスレッドが同時に処理を進めていますが、バイトコード上では1つのスレッドだけが処理を実行しているだけの状態(下記参照)であるために、cpu_sec.pyの実行時間とほぼ同じです。

$ time python3.8 cpu_threading.py

real    0m9.812s
user    0m9.820s
sys 0m0.090s

CPU使用率。CPU使用率が25%程度であることから、CPUのコアが4個あるうち1つだけしか利用できていないことがわかります。

$ mpstat 1
...(省略)
16:16:46     all   25.00    0.00    0.25    0.00    0.00    0.25    0.00    0.00    0.00   74.50
16:16:47     all   24.88    0.00    0.25    0.00    0.50    0.00    0.00    0.00    0.00   74.38
16:16:48     all   24.75    0.00    0.00    0.00    0.25    0.00    0.25    0.00    0.00   74.75
16:16:49     all   24.81    0.00    0.25    0.00    0.00    0.25    0.00    0.00    0.00   74.69
16:16:50     all   25.00    0.00    0.25    0.00    0.50    0.00    0.00    0.00    0.00   74.25
16:16:51     all   24.75    0.00    0.00    0.00    0.25    0.00    0.25    0.00    0.00   74.75
16:16:52     all   24.94    0.00    0.25    0.00    0.25    0.00    0.00    0.00    0.00   74.56
16:16:53     all   24.69    0.00    0.25    0.00    0.00    0.00    0.00    0.00    0.00   75.06
16:16:54     all   25.31    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   74.69
...(省略)

cpu_asyncio.py

asyncioを使用した場合です。実行結果は、cpu_threading.pyのものと同じです。

import asyncio

running = 0

async def burden_cpu_async():
    global running
    for i in range(10000):
        for j in range(10000):
            pass
    running-=1

async def main():
    await asyncio.gather(*[
        burden_cpu_async(),
        burden_cpu_async(),
        burden_cpu_async(),
        burden_cpu_async(),
    ])

asyncio.run(main())

実行結果。

$ time python3.8 cpu_asyncio.py 

real    0m9.433s
user    0m9.389s
sys 0m0.007s

CPU使用率。CPU使用率が25%程度であることから、CPUのコアが4個あるうち1つだけしか利用できていないことがわかります。

$ mpstat 1
...(省略)
01:10:20     all   24.94    0.00    0.00    0.00    0.25    0.00    0.00    0.00    0.00   74.81
01:10:21     all   24.81    0.00    0.00    0.00    0.25    0.00    0.00    0.00    0.00   74.94
01:10:22     all   25.00    0.00    0.00    0.00    0.25    0.00    0.00    0.00    0.00   74.75
01:10:23     all   24.81    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   75.19
01:10:24     all   24.88    0.00    0.00    0.00    0.50    0.00    0.00    0.00    0.00   74.63
01:10:25     all   24.81    0.00    0.25    0.00    0.00    0.00    0.00    0.00    0.00   74.94
01:10:26     all   24.94    0.00    0.00    0.00    0.00    0.00    0.25    0.00    0.00   74.81
01:10:27     all   24.75    0.00    0.00    0.00    0.25    0.00    0.00    0.00    0.00   75.00
...(省略)

ソースコード毎の実行結果まとめ表

ソースコード プロセス スレッド 実行時間(秒) CPU使用率(%)
cpu_sec.py 1 1 9.518 25.0
cpu_multiprocessing.py 4 1 5.351 99.75
cpu_threading.py 1 4 9.812 25.0
cpu_asyncio.py 1 1 9.433 25.0

threadingとasyncioを利用したほうが良い場合

I/O待ち時間が大きいもの(いわゆるI/O bound)を達成するためのソースコードである場合、threading(マルチスレッド)かasyncio(非同期I/O)を利用しましょう。

multiprocessing(マルチプロセス)を利用しない方が良い理由は、プロセスを作る際に発生するコストが大きいからです。プロセスを作るコストよりもスレッドを作るコストの方が小さいので、コストが小さい方を利用した方が良いということです。プロセスを新しく作ると、新しく作られたプロセスの数に比例してファイルディスクリプタ数、OSがCPUを切り替えるためのスイッチング回数が大きくなります。

同様にして、スレッドを作るコストという観点から言えば、スレッドを作るコストよりも非同期I/Oのイベントを発火するコストの方が小さいため、asyncioを利用する方が良いと言えそうです。

果たしてどうなのでしょうか?検証していきたいと思います。

threading vs asyncio

誤解を恐れずにいえば、threadingとasyncioは本質的にはどちらも、Pythonにおける「複数の処理を同時に進めるための仕組み」を提供するライブラリです。どちらにおいても、「Pythonにおけるマルチスレッド」にて述べた通り、インタプリターが出力したバイトコードはOS上で1つのスレッドでのみ実行されます。

両者の差異は次の点にあります。

  • threading
    • 昔からある。Python1.6(2000年)から標準ライブラリにあります。
    • 昔からある、マルチスレッドプログラミングというパラダイムに属する。
      • PythonのthreadingライブラリのAPIは、なんとなくですが、Javaのマルチスレッドに似ています。
    • 複数のスレッドを作り、それぞれの処理を同時に進めることができる。
      • 競合状態(Race Condition)に気をつけなければならない。
  • asyncio
    • 2015年(Python3.4)から導入された。
    • ここ10年ぐらいで広まってきた非同期プログラミングというパラダイムに属する。
    • ある処理がI/O待ちをしている間に他の処理を進めることができる。このことからわかるように、厳密に言えば、複数の処理を同時に進めているわけではなく、「待ち」が発生した時に、他に進めることのできる処理(待ちが発生していない処理)を進めているだけである(非同期I/Oについての詳細な説明は本記事では割愛します。詳しく知りたい方は、グーグル先生に聞いてみてください。)
      • 競合状態(Race Condition)をあまり気にする必要はないが、程よく、非同期I/Oの「待ち」(例asyncio.sleep関数など)が入るようなプログラムを書かなければならない(「待ち」が入らない場合、コンテキストスイッチングが起こらない。)

一見するとthreadingよりもasyncioを利用する方が良さそうですが、実際のところどうなのでしょうか?I/O boundな処理をそれぞれのライブラリを利用して書き、比較してみましょう。

検証環境

  • 4 vCPUs, 16 GB memory
  • CentOS, 8, x86_64 built on 20210701
  • Python3.8

比較方法

比較に用いられる検証用プログラムは2つあります。io_threading.pyとio_asyncio.pyです。

io_threading.py、io_asyncio.pyはそれぞれ、I/O boundなタスクを処理する常駐プログラムです。ウェブサーバーのようなプログラムを模倣しています。ウェブサーバーはポートに届いたリクエストを処理します。これを模倣し、検証用プログラムは標準入力に届いたタスクを処理します。ウェブサーバーには、リクエストの処理をスレッドに任せるもの(nginxのような)と、イベントループに任せるもの(node.jsのような)があります。io_threading.pyは標準入力に届いたタスクの処理をスレッドに任せます。一方、io_asyncio.pyはイベントループに任せます。

タスクは検証用プログラムの標準入力に入力されます。入力された文字列は数字でなければなりません。この数字は入力されたタスクの量を表します。max_weight_io_burdenが、検証用プログラムが処理しなければならないタスクの総量です。検証用プログラムが処理したタスクの量の和がタスクの総量を超えると、プログラムは終了します。

タスクはI/O boundなものです。タスクの量はI/O待ちの時間(秒)です。io_burden関数が、I/O boundなタスクを模倣します。

io_threading.pyとio_asyncio.pyは、タスクの総量をどれだけ速く終わらせることができるのか?を競います。

io_threading.py

I/O boundな処理をthreadingを用いて捌く実装です。

import threading
import fileinput
import time
import os

max_weight_io_burden = int(os.getenv('MAX_WEIGHT_IO_BURDEN'))

start = None

# 処理済のタスクの量
processed_weight = 0
processed_weight_lock = threading.Lock()

def io_burden(weight: int):
    # I/O boundな処理を模倣した関数
    # weight引数に指定された秒数だけ待ちを発生させます
    global processed_weight
    global processed_lock
    time.sleep(weight)
    with processed_weight_lock:
        processed_weight += weight
        if processed_weight >= max_weight_io_burden:
            print(time.time() - start, processed_weight)

def get_input():
    global start
    inputs = 0
    for line in fileinput.input():
        # 標準入力のタスクを受け取る
        # weightがタスクの量
        weight = int(line)
        if inputs == 0:
            start = time.time()
        if inputs >= max_weight_io_burden:
            # 処理済みのタスクの量がmax_weight_io_burdenに到達したらループを抜ける
            break
        # スレッドを生成し、タスクを処理するスレッドを開始
        t = threading.Thread(target=io_burden, args=(weight,))
        t.start()
        inputs += weight
    # 処理中のスレッドが全て終わるまで待つ
    while threading.active_count() > 1:
        pass

get_input()

io_asyncio.py

I/O boundな処理をasyncioを用いて捌く実装です。上記のio_threading.pyのasyncio版です。

import threading
import fileinput
import time
import os
import asyncio

max_weight_io_burden = int(os.getenv('MAX_WEIGHT_IO_BURDEN'))

start = None
processed_weight = 0

async def io_burden(weight: int, loop):
    global processed_weight
    await asyncio.sleep(weight)
    processed_weight += weight
    if processed_weight >= max_weight_io_burden:
        loop.stop()
        print(time.time() - start, processed_weight)

def get_input(loop):
    global start
    inputs = 0
    for line in fileinput.input():
        weight = int(line)
        if inputs == 0:
            start = time.time()
        if inputs >= max_weight_io_burden:
            break
        # タスクを処理するコルーチンをイベントループに登録する
        asyncio.run_coroutine_threadsafe(io_burden(weight, loop), loop=loop)
        inputs += weight

loop = asyncio.get_event_loop()

thread_input = threading.Thread(target=get_input, args=(loop,))
thread_input.start()

loop.run_forever()

プログラムの実行方法

このプログラムは2つの端末により実行します。

1つ目の端末では、検証用プログラムを動かします。環境変数MAX_IO_BURDEN_TASKSはプログラムが処理するタスクの総量です。

# 実行例
# プログラムを起動。このプログラムはタスクを1000000だけ処理したら終了する。
$ tail -f a.txt | MAX_IO_BURDEN_TASKS=1000000 python3.8 io_threading.py
# プログラムを起動。このプログラムはタスクを1000だけ処理したら終了する。
$ tail -f a.txt | MAX_IO_BURDEN_TASKS=1000 python3.8 io_asyncio.py

2つ目の端末では、プログラムにタスクを投入します。

# 量1のタスクを投入し続ける
$ while true; do echo "1" >> a.txt; done
# 量10のタスクを投入し続ける
$ while true; do echo "10" >> a.txt; done

実行結果

io_threading.py

プログラム MAX_IO_BURDEN_TASKS 単タスクの量(秒) 処理時間(秒) 備考
io_threading.py 1,000,000 1 188.2251 (1)
io_threading.py 1,000,000 2 98.5299 (1)
io_threading.py 1,000,000 3 67.4113 (1)
io_threading.py 1,000,000 4 54.4028 (1)
io_threading.py 1,000,000 5 - (2)
io_threading.py 1,000,000 30 - (2)
io_threading.py 1,000,000 40 43.4900 (4)
io_threading.py 1,000,000 50 52.6634 (4)
io_threading.py 1,000,000 100 101.3432 (4)

io_asyncio.py

プログラム MAX_IO_BURDEN_TASKS 単タスクの量(秒) 処理時間(秒) 備考
io_asyncio.py 1,000,000 1 127.0902 (1)
io_asyncio.py 1,000,000 2 71.9533 (1)
io_asyncio.py 1,000,000 3 50.1331 (1)
io_asyncio.py 1,000,000 4 36.5489 (1)
io_asyncio.py 1,000,000 5 29.2290 (1)
io_asyncio.py 1,000,000 6 25.2520 (1)
io_asyncio.py 1,000,000 7 22.3903 (1)
io_asyncio.py 1,000,000 8 20.4432 (1)(3)
io_asyncio.py 1,000,000 9 20.1911 (1)(3)
io_asyncio.py 1,000,000 10 19.8268 (3)
io_asyncio.py 1,000,000 20 24.8514 (4)
io_asyncio.py 1,000,000 30 33.0301 (4)
io_asyncio.py 1,000,000 40 42.3853 (4)
io_asyncio.py 1,000,000 50 51.8705 (4)
io_asyncio.py 1,000,000 100 100.7834 (4)

実行結果の考察

(1)過度なタスク分割によるオーバーヘッド増大

threading、asyncio共に、最も処理時間が大きくなっています。これはスレッドやイベントループ、その他諸々のオーバーヘッドの影響が大きくなってしまったことが起因していると考えられます。マルチスレッドのメリットは大きなタスクを小さなタスクに分割し、複数のタスクを複数のスレッドが同時に処理することで、全てのタスクを速く終了させるための手法です。タスクを小さくすればするほどそれぞれのスレッドは速く終了しますが、よりたくさんのスレッドを生成・管理しなければなりません。非同期I/Oでも同様に、タスクを小さくすればするほどそれぞれのタスクは速く終了しますが、よりたくさんのタスクを非同期I/Oのイベントループに登録・管理しなければなりません。また、今回のプログラムの場合、タスクを小さくすればするほどタスクを標準入力から読み込む回数も大きくなります。

asyncioの方がthreadingよりも処理時間が小さいです。これは非同期I/Oのイベントループのタスクの登録・管理にかかる時間の方が、スレッドの生成・管理のそれよりも小さいからであると考えられます。非同期I/Oの方がマルチスレッドよりもコンテキストのスイッチングに関わるオーバーヘッドが小さいという一般論にも合致します。

(2)OSのスレッド数上限値が影響

OS上で稼働しているスレッド数が、実行環境の上限値に引っかかってしまい、エラー終了します。

$ tail -f a.txt | MAX_WEIGHT_IO_BURDEN=1000000 python3.8 io_threading.py 
Traceback (most recent call last):
  File "io_threading.py", line 43, in <module>
    get_input()
  File "io_threading.py", line 37, in get_input
    t.start()
  File "/usr/lib64/python3.8/threading.py", line 852, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
32702
libgcc_s.so.1 must be installed for pthread_cancel to work

実行環境OSはLinuxです。1プロセス毎に作ることのできるスレッド数には上限値があります。CPythonのスレッドはpthreadを用いて実装されているため、この上限値の影響を受けます。

$ cat /proc/sys/kernel/threads-max
126329

(3)パフォーマンス頭打ち

最も処理時間が小さいですが、(4)で述べる理想的なパフォーマンスの向上が頭打ちになっている状態です。(1)で述べたようなオーバーヘッドの影響が出始めてきたものと思われます。

(4)タスク分割数に比例してパフォーマンス向上

単タスクの量と処理時間がほぼ同じです。マルチスレッド、非同期I/O、共に、理想的なパフォーマンスの向上が実現できています。「理想的な」という所以は、大きなタスクを小さく分割した分だけ、処理時間が向上しているからです。

io_threading.pyとio_asyncio.pyの処理時間に差異がほとんどありません。これは(1)で述べたようなオーバーヘッドの影響が無視できるほど小さいからだと思われます。

まとめ

今回の試行錯誤から得られた結論は次です。

  • CPU負荷の高い処理(いわゆるCPU bound)を達成したいのであれば、マルチプロセス(multiprocessing)を利用。
  • I/Oの待ち時間が大きい処理(いわゆるI/O bound)を達成したいのであれば、マルチスレッド(threading)か非同期I/O(asyncio)を利用。
  • 同時に実行しているスレッド数が大きい場合において、非同期I/Oのパフォーマンスの方が良い。ただし、同時に実行しているスレッド数が大きくない場合においては、マルチスレッドと非同期I/Oのパフォーマンスの差異はあまりない。

参考

JX Press Tech Talk #python で「StreamlitとFlaskではじめる爆速プロトタイピングとTV砲対策」というトークをしました

JX通信社シニア・エンジニアかつ, 最近は自社のテックイベント「JX Press Tech Talk」の司会者をやってる@shinyorke(しんよーく)です.

6/23(水)に, 「JX Tech Talk #python Pythonista 達が語る速報サービス開発の舞台裏」というイベントを開催しました.

jxpress.connpass.com

参加いただいた皆さま, ありがとうございました!

私は前述の通り, このイベントの司会をさせていただいたと同時に, 登壇者として「StreamlitとFlaskではじめる爆速プロトタイピングとTV砲対策」というテーマでトークもさせていただきました.

このエントリーでは, 発表後のフィードバック・ご意見等を踏まえた上で,

  • 当日お話したこと
  • ちょっとした補足
  • JX Press Tech Talkについて

というテーマで軽く書きたいと思います.

TL;DR

  • エンジニアとデータサイエンティストが共存するようなプロジェクトの進め方は結構大事
  • Streamlitでプロトタイピングするときに合わせてテストを書こう
  • App EngineでできないことはCloud Runに任せるといい感じになる

おしながき

当日お話したこと

最初に軽く当日の話をふりかえります.

私のトークは,

  • Streamlitでプロトタイピング(プロトタイプ開発)してチームに共有する
  • Flask + GCP(Google App Engine, Cloud Run)でスケーラビリティある構成でシステムを開発・運用

という2本の軸でお話をしました.

Streamlitでプロトタイピング

議論のネタになるプロトタイプの用意がマスト(かつnotebookじゃないほうがいい)

という思いでStreamlitを使いました.

こちらのお話, 実は過去にこのブログにも書いたことでもありました.

tech.jxpress.net

JX Press Tech Talkでプレゼンしたデモのコードも上記エントリーで紹介したものとなります.

github.com

当日のトークでお話をしました,

  • notebookからstreamlitへの移行
  • ngrokを使ってチームに共有

というお話はこのエントリーのダイジェストであり, サンプルコードとして提供しているものでもありました.

当日お越し頂いた方も, このエントリーから知った方もぜひ手元で試してもらって, 「便利そうだな」って思ったら仕事や趣味に活かしてもらえると幸いです.

軽量FWとGCPを使ったプロダクト開発と運用

後半戦の話は,

  • プロトタイプから本プロダクト開発はFlask, FastAPIを使いました
  • 高負荷対策を楽にするためGoogle App Engine + Cloud Runにしました

という話でした.

Flaskの話メインでしたが, こちらについては「プロトタイプからの移植というストーリーから逆算して作るにはどうしたらいいか」というテーマで主にパッケージ構成とテストの話をしました.

f:id:shinyorke:20210629212143j:plain

Pythonは「データサイエンティストの人が作ったモデルをそのまま同じ言語でWebのプロダクトとして開発できる」明確な強みがある一方,

  • データサイエンティストが書くコードと, プロダクトのエンジニア*1が書くコードは(それぞれの領域・メンタルモデルが異なるため), 大切にする価値観・趣が異なる
  • なぜかといえば, データサイエンティストがやることはプロトタイプで, プロダクトのエンジニアは保守運用を目指して開発するから

という問題を抱えやすいため,

データサイエンティストとエンジニア両者の間を取るため, 「パッケージ構成とテストコード」を最後の砦とする

というルールで進めました.

この, 「データサイエンティストとエンジニアが指向するメンタルモデルの違い」は語ると長いので, 気になる方は「仕事ではじめる機械学習第2版」をご覧頂ければと思いますが, 割とありがちな課題だったりするのでAIプロジェクトをやる方はぜひ意識するといいと思います.*2

ちなみにこのプロジェクトでは「データサイエンティスト」「エンジニア」は私一人の役割(兼任)であったため, この問題は発生しませんでした.*3

また, 「TV砲に耐えるための高負荷対策」の件は, こちらのブログの内容そのままだったりします.

tech.jxpress.net

基本的にはこのエントリーのダイジェストという形でお話しました.

「GCPで作ったサービスをいい感じにTV砲対策する」ノウハウをまとめたつもりなので気になる方はぜひ読んで頂ければと思います.

なお, これがAWSや他のクラウドサービスであったとしても考え方は流用できるんじゃないかなと思ってます.

ちょっとした補足 - 当日話さなかったこと

当日および後日頂いたフィードバック・質問に対する補足です.

一部サービスをCloud Runで切り離した理由

最初はApp Engineのみでイケると思ったのですが,

  • SNSシェア用のOGP画像などで独自フォントが必要だった
  • その他, プロダクトの細かい仕様の制約

という理由で画像の生成のみCloud Runで切り離しました.

tech.jxpress.net

こちらも細かい話はブログに記載していますので気になる方はぜひチェックしてみてください.

当日話さなかった答えはすべてここにあります.*4

FastAPIからFlaskへの書き換え

プロトタイプの段階でFastAPIを使った簡易的なRESTful APIを用意していたのですが, このときはなぜかApp Engineで動かず, 調査する時間もさほどなかった為, Flaskに書き換えました.

ちなみに後日, 同僚から「App Engine, FastAPIでも動くやで」と聞いた&gunicorn使ってuvicorn動かせばよかったのねと気がついたのがJX Press Tech Talkの準備をしていた今月の話でもありました.

ちょっと見れば書き換えいらなかったかも...という後悔を覚えつつも, 極力Framework依存を減らした構成をとっていたので傷口はかなり浅く済んだのではと満足しています.

JX Press Tech Talkについて&結び

JX Press Tech Talk #python では, 私のトークの他,

  • @kimihiro_nさんによる, 「新しいメンバーにMake debutしてもらいやすくするための開発体制 with Python」
  • サーバーサイドエンジニアの鈴木さんによる「Python on Google Cloud Functionsで作るバッチ処理」

といった, JX通信社の開発チームで実際あった話・ノウハウの話がありました.

どちらも現場発の情報で参考になるんじゃないかなと思います.

また, イベント参加者のフィードバックにつきましても,

  • チャレンジできる環境があるのは素晴らしい
  • 多くの学びがあった

など, アンケート含めて好意的なご意見・今後の学びになるご意見を多数いただきました.

個人的には, 「司会が聞きやすかった」「進行が上手」というフィードバック嬉しかったです苦笑*5

次回は未定ですが, またお会いできる日を楽しみにしております!

なお, 最後に大事な話をしますが,

サーバーサイドエンジニアをはじめ, 絶賛募集中です!

jxpress.net

最後までご覧いただきありがとうございました.

*1:あえて「プロダクトのエンジニア」と書いたのは, 同じエンジニアでもデータサイエンティスト寄りでプロトタイピングががメインの方もいるので狭義の意味で縛る意味で「プロダクトの」という枕詞を付けました

*2:仕事ではじめる機械学習第2版の6章に詳しい話があります, 結構面白い話なのでオススメです.

*3:が, 今後はチームでやるとか普通にあり得るので一人の段階でも最初から考えてやりました.

*4:一時期, 当日の話でもやろうかなと考えていましたが, 尺が15分のトークで3つのテーマを話すのはキツイなという理由で画像生成の話はブログで先行して書いてリリースし, StreamlitとGCPの話をメインにするという決断をしました.

*5:毎回, 塩梅とか進行に苦心しているのでホント嬉しかったです, ありがとうございます&今後もがんばります

Slackアプリ開発の社内勉強会を開催しました

サーバサイド開発やインフラ周りをいじっているたっち(TatchNicolas)です。

JX通信社の日々の運用では、Slack workflowやbotが大活躍しています。

かなり作り込まれた高機能なBotもあり欠かせないものになっていますが、開発者メンバーのなかには普段そのリポジトリを触らない人・すでにあるものに機能追加・改修はするがゼロから立ち上げたことはない人などもいます。ハードルをグッとさげることで自分たちの斧を研ぎやすくできないか?と考えました。

そこで毎月開催している社内勉強会にて、今回はSlackアプリ開発をテーマにしましたのでその様子について紹介します。

内容

初めて触る人でも開発をすぐに始められるように、社内でよく使われる言語でテンプレになるリポジトリを用意しました。

また、Permissionの設定などは最初はとっつきにくいため、Tandem*1で複数人の画面共有をしながらお手伝いしつつ進めました。 その後基本的なポイントや概念の説明をして、みんなでワイワイしながら開発していく形式で会を進行しました。

最初に知っておくと良い概念

初めてSlackアプリケーションを開発する人にもわかりやすいように、前述の雛形や初期設定の他に知っておくと入りやすい概念について簡単に説明をしました。

Socket Mode

f:id:TatchNicolas:20210618214336p:plain

普通にSlackアプリケーションを開発すると、Slackからのイベントを受け取るのにpublicにhttpでリクエストを受けられるURLが必要です。ngrokなどのツールを使って用意しても良いですが、より手軽にSlackアプリケーションを開発できる方法として Socket Mode があります。

Socket ModeではpublicなURLを持つ場所へデプロイする必要がなく、ささっと手元ですぐにSlackアプリケーションを動かすことが可能です。

今回の勉強会には十数人が参加して、その分だけデプロイ先の環境を用意するのも大変ですし、勉強会のあとでお片付けも必要です。デバッグの容易さも含めて気軽さを優先するために上記のサンプルリポジトリではSocket Modeでテンプレートを作りました。

Event Subscription

f:id:TatchNicolas:20210618214405p:plain

Botがメンションを受け取ったり、誰かがチャンネルに入ったりなどSlack上の出来事のうち、どのイベント種別を受け取るかを設定するのがEvent Subscriptionです。

api.slack.com

たとえばPythonではデコレータの形で指定して、受け取ったイベントに対して処理を行う関数を書いていくことになります。

from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler

app = App(token=os.environ["SLACK_BOT_TOKEN"])

@app.event("app_mention")
def print_mention_event(event):
    """
    メンションが来たら発火する関数
    """
    print(event)

作品紹介

成果発表タイムで共有された作品を紹介します。限られた時間でしたが、なかなか面白いBot達が色々あって楽しい時間となりました。

画像認識(YOLO)Bot

f:id:TatchNicolas:20210618220132p:plain

形態素解析Bot

f:id:TatchNicolas:20210618220752p:plain

gou

f:id:TatchNicolas:20210618220816p:plain

f:id:TatchNicolas:20210618220824p:plain

社内で「いいこと」をした人をSlackで讃えたり ++ とインクリメントを送るとポイントとして記録するカルマボットをGoで実装したもの。*2

github.com

占いBot

f:id:TatchNicolas:20210619075529p:plain

なぜか極端にてんびん座に厳しくて笑いました。*3

まとめ

複数の言語で雛形となるリポジトリを用意し、初期設定の説明を画面共有しながら一緒にすることで、初めての方達にもすぐに手を動かして楽しんでもらうことができました。

クスッとくるようなBotから、普段の業務を生かした画像認識や形態素解析をBot化した作品もあり、JXらしさのある楽しい勉強会になりました。

Pythonトークイベントの告知

2021/06/23(Wed) 19:30から Pythonにまつわる色々な話をするイベントを予定しています。ぜひ参加してみてください。

jxpress.connpass.com

*1:最近JX通信社で使われているチームやプロジェクトごとの部屋に出入りして使う「バーチャルオフィス」なツールです https://tandem.chat/

*2:Goで業...

*3:漢字の星座名で条件を引っ掛けているので、ひらがなだとデフォルトで適当に返答するように作ったそうです(笑)。

サーバーサイドで動的にOGP画像をシュッと作る方法 - FastAPIとCairoSVGで作る画像生成API

JX通信社シニア・エンジニアの@shinyorke(しんよーく)です.

最近は色んなエンジニアリングをしつつ, イベントの司会業をしています(詳細は最後の方を見てね).

開発しているサービス・プロダクトの要件で,

  • TwitterやLINE, FacebookでシェアするOGP*1コンテンツ(タイトル・本文・画像)が欲しい
  • コンテンツはユーザーさんの操作で動的に変わる
  • テキストだけじゃなくて, 画像も変えたい←これ

なんて事は非常によくある話だと思います.

私はちょっと前に開発したAIワクチン接種予測でそれがありました.

f:id:shinyorke:20210521183040j:plain
こういうやつです

例えば上記画像のテキスト(地域・年齢・接種可能時期)は予測の結果を動的に画像テンプレートに入れて都度作っています.

上記のOGPを生成するために必要なことはこういう感じだろうなー, と以下の絵の通り整理し,

f:id:shinyorke:20210528180851j:plain
やったこと

結果的に, OGPを生成するためのサービスを200行にも満たない小さいAPIサーバーとして実現しました.

このエントリーでは,

  • CairoSVGを使った画像生成
  • FastAPI + CairoSVG + GCSを使ったAPI構築
  • Cloud Runでの運用

で上記のサービスを作った事例を紹介します.

なおこのエントリーは,

  • PythonでWebアプリを作ったことがある
  • FastAPIおよびJinja2を使える/使ったことがある
  • ラスタ画像とベクター画像の意味がわかる

程度の前提知識を読者の皆さまが持っている, と想定して執筆しています(Pythonアプリ開発初級みたいなノリです).

TL;DR

  • FastAPIとCairoSVGで画像の動的生成はいい感じにできちゃいます
  • テンプレートエンジンは迷ったらJinja2
  • フォント指定はハマるので気をつけよう

おしながき

ベクター素材から画像を作る🐍

前述の通り, やりたいことは

  • 地域・年齢・接種可能時期といったテキストをAIさんが準備し
  • 予め用意したテンプレートのベクター素材にテキストを埋め込み
  • pngなどのラスタ形式に変換(いわゆるラスタライズ)

です.

自分で調べたり, 社内で色々聞き回った結果,

  • SVGで素材を準備して(デザイナーさんにお願いして)それを使ってサーバーサイドでラスタライズする
  • SVGからのラスタライズはPythonのライブラリでできるっぽい

というのがわかったので早速やりました.

CairoSVGで画像を作る

SVGをラスタライズする手段は意外とあっさりで, CairoSVGというライブラリで解決しました.

cairosvg.org

github.com

公式サイトのサンプルを見る限り, やりたいことは出来そうとわかりました.

# pip install cairosvg で普通に入ります
import cairosvg

# SVGからPDF
cairosvg.svg2pdf(url='sample.svg', write_to='sample.pdf')


# SVGからpng(これが本命でやりたかったこと)
cairosvg.svg2png(url='sample.svg', write_to='sample.png')

コードもすごく短いですし, とても良さそうです👍

SVG素材をJinja2テンプレにする

やりたいことはCairoSVGでできそうとわかったのですが, 前述の通り今回やりたいのは

  • 地域・年齢・接種可能時期といったテキストをAIさんが準備(できている)
  • 予め用意したテンプレートのベクター素材にテキストを埋め込み
  • pngなどのラスタ形式に変換(CairoSVGでやれる)

で, 肝心の「テンプレートのベクター素材にテキストを埋め込み」が解決していません.

ちなみにやりたいことをコードに起こすとこんな感じです.

# 実際のコードとは異なります(あくまでサンプルです)

import cairosvg

def convert_ogp(area: str, age: int, period: str):
    # 何かしらの方法でSVGフォーマットなテキストを入手
   ogp_context = 'TODO: ここにSVGの中身が入る'
   # TemporaryFileとして書き出して後byteでもらう
    with NamedTemporaryFile('w') as f:
        f.write(ogp_context)
        f.flush()
        image_bytes = svg2png(url=f.name, write_to=None)
    return image_bytes

image = convert_ogp('東京都', 40, '10月下旬〜2月上旬')

一番単純な方法はヒアドキュメントとしてSVGテキストを用意して置換することなのでしょうが, デザイン素材を(文字列とはいえ)Pythonコードに書くのは若干気が引けた*2ので,

  • SVG素材をJinja2テンプレとして書き直す
  • 上記Jinja2テンプレを元に画像生成する

という方法でいい感じにやりました.

# 実際のコードとは異なります(あくまでサンプルです)

import cairosvg
from jinja2 import Environment, FileSystemLoader


def convert_ogp(area: str, age: int, period: str, template_file: str):
    # テンプレファイルと引数からテキストを生成
    env = Environment(loader=FileSystemLoader(os.path.dirname(template_file)))
    ogp_template = env.get_template(os.path.basename(template_file))
    ogp_context = ogp_template.render(area=area, age=age, period=period)

   # TemporaryFileとして書き出して後byteでもらう
    with NamedTemporaryFile('w') as f:
        f.write(ogp_context)
        f.flush()
        image_bytes = svg2png(url=f.name, write_to=None)
    return image_bytes

# area, age, periodという変数を持ったjinja2テンプレを事前に準備
image = convert_ogp('東京都', 40, '10月下旬〜2月上旬', 'templates/ogp.svg.j2')

テキスト(ベクター)として扱う時はまずJinja2ってくらいよく使ってるのでこれはあっさり思いついてすぐ実現できました.

何かしらのフォーマット(HTMLでもXMLでもYAMLでもJSONでも)で動的にコンテンツを生産したい時はJinja2便利です.

流石Pythonを代表するテンプレートエンジンなだけあります*3.

APIサーバーにする

画像生成の仕組みはこれで目処が付いたので, API化します.

FastAPIサーバーとして用意する

やることは,

  • 予測結果を受け取って画像をGoogle Cloud StorageにuploadするAPIを作る
  • 予測結果はPOSTパラメーターとして受け取る

です.

APIはFlask, bottle, FastAPIと比較的軽めなFrameworkなら何でも大丈夫*4なのですが今回は(社内でよく使ってる)FastAPIで実現しました.

# 実際のAPIはもっとちゃんと作ってます(イメージを掴むためのサンプルです)
from fastapi import FastAPI
from pydantic import BaseModel

# さっきのOGP生成関数
from sample_image import convert_ogp


class OgpContext(BaseModel):
    """
    OGPで使う項目
    """
    age: int
    area: str
    period: str

# docは公開する必要ない(しちゃ🙅)なので使いません
app = FastAPI(docs_url=None, redoc_url=None)


@app.post('/creage')
def create(form: OgpContext):
    image_byte = convert_ogp(form.area, form.age, form.period, 'templates/ogp.svg.j2')    
    # 取得したimageを何かしらの方法でアップロード
    upload(image_byte)
    return {'status': 'ok'}


if __name__ == '__main__':
    import os
    import uvicorn

    uvicorn.run(app, host='0.0.0.0', port=os.getenv('PORT', 8080))

画像作ってアップロードするだけなら(上記のコードでは端折ってる画像アップロード関数含めて)ほんの100〜200行で終わります.

Cloud Runでホスティング

実サービスのホスティングはCloud Runで行いました.

cloud.google.com

Dockerコンテナ一つで動く薄い画像生成APIとして作っていたので,

  • gcloudコマンド2回でデプロイまでいける
    • Cloud Buildでimage作ってレジストリ(GCR)にpush
    • Cloud Runにデプロイ
  • トラフィックに応じてインスタンスを増やす(減らす)がGUIコンソールをポチポチするだけでいける

という利便性を重視してCloud Runにしました.

ちなみに「コマンド2回でデプロイまで」はこんな感じです

# Docker buildとimage push
gcloud builds submit --tag asia.gcr.io/example-test-prj/ogp-generator

# 上記imageをdeploy
gcloud run deploy --image asia.gcr.io/example-test-prj/ogp-generator --platform managed

たったこれだけで終わるの最高です.

ハマったこと

ちなみに開発中ハマったこととして, 文字化けがありました.

  • 開発環境(自分のMac)では文字化けしない
  • Docker imageから作った環境だと文字化けする

原因は明確で, 「Macには存在するフォントだけどDocker image(Debian)には存在しないフォント」でやったのでこの差がでました.

FROM python:3.9

# install
COPY poetry.lock pyproject.toml ./
RUN pip install poetry
RUN poetry config virtualenvs.create false \
  && poetry install --no-dev

# app
COPY app.py ./
ADD templates templates

# templates/fonts配下のフォントを然るべき所にコピー
COPY templates/fonts /usr/share/fonts/truetype/dejavu

CMD  python app.py

SVGフォーマット上で使ってるフォントを洗い出し, 見つけて追加して無事解決しました.

運用の結果&結び

というわけで, このエントリーではサーバーサイドで動的に画像を作る方法について紹介しました.

AIワクチン接種予測がTVに出たときもこの仕組みが動いていたのですが, 大きなトラブルもなくトラフィックに耐えきったのでやり方として正解だった様に思えます.

なお, このエントリーは(このブログを書く前に)社内のWin Sessionでネタを披露したところ, ウケが良かったので今回ブログとして公開することにしました.

これで似たような課題を抱えている皆さまの手助けになると幸いです.

JX Press Tech Talk でPythonの話やります!

最後にちょこっと宣伝です.

jxpress.connpass.com

好評だった前回に続きまして, JX Press Tech Talk第二弾を6/23(水)にやることになりました!

私は前回に続いて司会をやると同時に「StreamlitとFlaskでPoCから本番運用までやりきったやで📺」的なお話をさせていただきます.

二刀流で頑張りたいと思います, お時間ある方ぜひいらしてください!

*1:Open Graph Protocolのことで, かんたんに言うとSNSシェアする時に出てくる文章や画像を定義するための仕様・お決まりのことです. ちなみにTwitterはTwitter Cardという独自のお決まりがあります(やろうとしてることはOGPと一緒)

*2:「明日にもリリースしなきゃ」という状況だったらこの方法は全然アリです. 今回は(タイトなスケジュールだったとはいえ)そこまでじゃなかったのでテンプレとコードはちゃんと分離しました.

*3:Flaskのデフォルトテンプレートエンジンだったりしますし, Djangoでも使ったりします. また, AnsibleのplaybookはJinja2がベースだったりします.

*4:もちろんDjango(厳密にはDjango REST framework)も候補になると思います. が今回はホントに薄いラッパーなので軽量FWを中心に考えました.

GitLabとKubernetesで作る、自動で起動・停止できるブランチ別環境

SREのたっち(@TatchNicolas)です 本記事は、先日の弊社主催のTechトークイベントで発表した内容について、もうすこし詳細に書いてみます。

jxpress.connpass.com

TL;DR;

  • GitLabのenvironmentとHelmのreleaseを対応させることで動的な環境の作成・削除を実現した
  • CIOpsとGitOpsを使い分けて、「ちょうどいい」使用感を目指した
  • 環境の順番待ちがなくなった!

背景

JX通信社のサーバサイド開発では、ECSによるコンテナベースでの開発・デプロイが主流です。*1

環境としては

  • ローカル環境: docker-composeやgo run yarn run などで起動する、文字通り手元のマシンのローカル上で動かす環境
  • 開発版環境: SQSやFirestoreなどのマネージドサービスや、ローカルで立てるのが大変な別のAPI*2との繋ぎ込み等に利用する環境
  • ステージング環境: 本番デプロイ前のチェックに使う環境
  • 本番環境: エンドユーザが実際に利用する環境

という構成になっています。

しかし、開発チームの規模が大きくなってくると開発版環境を使った繋ぎ込みの確認のために順番待ちが発生するようになり、Slack上でも「@here どなたか開発版環境つかってます?借りていいですか?」のようなやりとりが散見されるようになりました。結果、思うように開発スピードが上がらなくなってしまいました。

そこで、開発が進行している複数のトピックブランチごとに独立してコンテナのアプリケーションをデプロイできると、開発版環境の順番待ち問題を解決 できるのではないか?と考えました。

どうやって実現しているか

この問題を解決するには、以下の要件が満たされている必要があります。

  1. 環境が自動で作成され、独立したURLが動的に払い出される
  2. Merge requestが閉じられたら、その環境も閉じられる

そのために、最近社内でも導入の進んでいるKubernetesをベースにすれば、比較的簡単に実現可能と考え、以下の技術選定をしました

  1. URLの払い出し: Istio
  2. 自動的な環境作成・削除: GitLab CI & Helm

構成について

全体の構成を図に表すと以下のようになります。登場人物についてもう少し詳しく解説します。

f:id:TatchNicolas:20210521212304p:plain

Ingress

トラフィックの制御はIstioに任せるため、全てistio-ingressgatewayに流すようにしています。 そのため、図からは省略しました。

「クライアントからのリクエストのTLSを終端して、k8sクラスタの中へリクエストを連れて行く」ことを担当します。

GKEの場合は、GCPから発行される証明書がワイルドカードに対応していないため、cert-managerで発行したものをIngressで指定してLoadBalancerに持たせます。

EKSの場合は、AWS Load Balancer Controllerを利用しました。

Istio

「k8sクラスタの中に入ってきたリクエストを、Hostヘッダをみて環境ごとのk8s Serviceに振り分ける」を担当します。Ingressのルールでも同じことは可能ですが、Istioを使うことで以下のメリットを享受できます。

  • ホスト名やパスなどによるリクエスト振り分けをVirtualServiceで設定することで、Ingressの中央集権ではなく個々のアプリケーション単位で行えるようになる*3
  • 認証周りをアプリケーションから剥がすことができる

GitLab CI

「トピックブランチごとの環境の作成・削除」を担当します。

GitLabにもKubernetesクラスタと連携する機能はあるのですが、GitLabのお作法に従う必要が出てくるためあまり使っていません。

JX通信社では、プロダクト単位でGitLabのグループを分けており、その単位でクラスターを作成しているので、開発版環境のクラスタの情報を登録して、GitLab CIからkubectlhelm のコマンドを叩けるようにしました。*4

ArgoCD

「各クラスタに対するメインブランチの反映」を担当します。詳しくは後述

実際のワークフロー

(1) 開発者が、ある機能の実装のためにfeatureブランチを切ってpush

各アプリケーションに対応するmanifestsリポジトリを用意して、そこでブランチを切ります。

(2) GitLab CIがfeatureブランチに対応する環境を作成する

ここで、下記のCI Jobが発火します。

setup_mr_env:
  stage: mr_env
  image: docker-image-with-helm
  script:
    - helm upgrade -i -n fastalert-api ${CI_COMMIT_REF_SLUG} ./chart --set url=${CI_ENVIRONMENT_URL}
  environment:
    name: ${CI_COMMIT_REF_SLUG}
    url: https://${CI_COMMIT_REF_SLUG}.your-domain.com
    on_stop: teardown_mr_env
  only:
    - branches
  except:
    - main
    - tags

利用するHelmテンプレートは以下のようになっています。*5

chart
├── Chart.yaml
├── values.yaml
└── templates
    ├── NOTES.txt
    ├── _helpers.tpl
    └── main.yaml

chart/values.yamlhelm upgrade -i ... --set xxx=yyy で値を渡していきます。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ .Release.Name }}
spec:
  replicas: 1
  selector:
    matchLabels:
      app: {{  .Release.Name  }}
  template:
    metadata:
      labels:
        app: {{  .Release.Name  }}
    spec:
      serviceAccountName: your-app-serviceaccount
      containers:
      - name: api
        image: {{ .Values.image }}
        ports:
        - containerPort: 8000
---
apiVersion: v1
kind: Service
metadata:
  name: {{ .Release.Name }}
spec:
  type: ClusterIP
  selector:
    app: {{ .Release.Name }}
  ports:
    - name: http
      protocol: TCP
      targetPort: 8000
      port: 80
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: {{ .Release.Name }}
spec:
  gateways:
  - istio-system/gateway
  hosts:
    - {{ (urlParse .Values.url).host }}
  http:
  - route:
    - destination:
        host: {{  .Release.Name  }}.your-app-ns.svc.cluster.local

(3) レビュー担当者は、コードを確認しつつ2.でできた環境を触って動作確認したりする

前述のマニフェストが反映されたら、 https://${CI_COMMIT_REF_SLUG}.your-domain.com というURLでアプリケーションにアクセスできるようになります。

また、正式なレビューに至る前のちょこっと書き換えて挙動を確認して、のようなことをするなら逐一CI越しのデプロイをせずにTelepresenceを使います。

(4) LGTMをもらってMergeすると、トピックブランチ用の環境は自動で削除される

GitLab上で、

  • Merge Requestに表示される停止ボタンをクリックする
  • Pipeline上で表示されるmanualなJobを実行する
  • Merge Requestが閉じられる

のいずれかの操作を行うと、下記のCI Jobが発火します。これにより、2. で作成されたhelm releaseがアンインストールされます。*6

teardown_mr_env:
  stage: mr_env
  variables:
    GIT_STRATEGY: none
  image: docker-image-with-helm
  script:
    - helm uninstall -n fastalert-api ${CI_COMMIT_REF_SLUG}
  when: manual
  environment:
    name: ${CI_COMMIT_REF_SLUG}
    action: stop
  except:
    - main
    - tags

(5) (6) ArgoCDがステージング・本番用ブランチの内容を同期する

ここだけ、GitOpsな方針になっています。詳しくは後述します。

もう少し細かい話

リポジトリ戦略について

JX通信社では、1つのアプリケーション(≒マイクロサービス)を1つのGitLabプロジェクト(=リポジトリ)に対応させ、その中でソースコードとデプロイの両方を管理しているケースがほとんどでした。

今回、Kubernetesを導入するにあたって、アプリケーションとデプロイの関係を疎にするために

  • アプリケーションリポジトリ:
    • CIではlint、テスト、コンテナイメージの作成までを担当する
  • マニフェストリポジトリ:
    • 開発版クラスタのトピックブランチ別環境へのデプロイのデプロイ
    • ArgoCDがこのリポジトリのmainブランチをクラスタに同期する

という構成にしました。マニフェストリポジトリのほうはモノレポではなく、デプロイしたい単位に分けています。するとアプリケーションとマニフェストを一対一に対応させたり、複数のAPIのセットをまとめて一つのデプロイの単位としても扱えたり、柔軟な運用が可能になりました。

CIOps v. GitOps

CIOpsとGitOpsの特徴などについては、すでに良い記事が世の中にたくさんあるので、ここでは説明を割愛しますが、「CIとCD」。

  • プロダクト開発チームとしては、どんどんコードを書いてデプロイしたい
  • でも本番環境はバージョン管理もセキュリティもしっかりしたい

そこで、JX通信社では

  • 開発版クラスタの各トピックブランチに対応する環境にGitLab CIからデプロイ(=CIOps)
  • 開発版クラスタおよび本番クラスタのmainブランチは、ArgoCDでデプロイ(=GitOps)

のように、環境に応じてデプロイ方式を変える方式を採用しました。すると、

  • 開発版トピックブランチへの作業はpushするたびに随時反映されていき、グイグイと開発を進められる
  • 本番環境クラスタに対する権限をGitLab CIに渡していないので、よりセキュアに保てるようになる

という環境ごとの目的にマッチした利用が可能になりました。

Namespace・ServiceAccountについて

EKS/GKE上で動くアプリケーションは、マネージドサービスを利用するためにWorkload Identity/IRSAを利用します。そのためには、AWS/GCPのIAMがKSAのあるNamespaceとServiceAccountを知っている必要があります。

そのため、動的に追加/削除されるHelmのtemplateからNamespaceとServiceAccountは除外して、クラスタ自体を管理してるリポジトリで、新しいアプリケーションの追加時にArgoCD Applicationの定義と一緒にNamespaceを定義・作成しています。つまり、各トピックブランチの環境はIAMを共有しています。ここは権限の変更も含めてトピックブランチ環境間で独立させられると理想的なのですが、複雑になってしまうため今回の仕組みからは外しました。

apiVersion: v1
kind: Namespace
metadata:
  name: your-app-ns
  labels:
    istio-injection: enabled
---
apiVersion: v1
kind: ServiceAccount
metadata:
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::1234567890:role/your-app-role
  name: your-app-serviceaccount
  namespace: your-app-ns
---
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: your-app
  namespace: argocd
spec:
  project: default
  source:
    repoURL: https://your-gitlab.com/your-group/your-project.git
    path: path/to/chart/
    targetRevision: HEAD
    helm:
      releaseName: main
  destination:
    server: https://kubernetes.default.svc
    namespace: your-app-ns

まとめ

  • 環境の順番待ちがなくなって、複数の施策を並行して進めやすくなった
  • CIOpsとGitOpsのいいとこ取りをして、「ちょうどいい」ワークフローを実現できた
  • それぞれの環境はあくまで helm install / helm uninstall しかしてないので、中でどんなKubernetesリソースを利用しているのかは関係なくなり、CI/CDの仕組みが標準化できるようになった

以上、先日行ったTechトークの詳細をブログにしてみました。どなたかの参考になれば幸いです。

JX通信社では、2021/06/23(Wed) に二回目のTechトークイベントを予定しています。次回はPythonにまつわる色々な話をする予定ですので、ぜひ参加してみてください。

jxpress.connpass.com

*1:Serverless FrameworkやSAMを使ったLambdaへのデプロイもよく使われます Serverless Framework+mangum+FastAPIで、より快適なPython API開発環境を作る - JX通信社エンジニアブログ

*2:たとえば、VPCに閉じていて外部から触れない内部向けのAPIなどです

*3:AWS Load Balancer Controllerでは振り分けルールをバラして書くことができますが、GKEでもできるだけ近い方法でやりたかったのと、他にも重み付けやリトライ制御などServiceMeshの機能として使いたいことがあったのでL7ルーティングもIstioに任せました

*4:https://docs.gitlab.com/ee/user/group/clusters/

*5:実際には、IstioのAuthorizationPolicyの設定なども含めていますが省略しています

*6: GIT_STRATEGY: noneを指定しないと、GitLabでマージと同時に元ブランチを削除する設定にしていたときに失敗してしまいます。