Goの並行処理について

はじめに

こんにちは。サーバーサイドエンジニアインターンでお世話になっている杉山と申します。 今回はオライリー・ジャパン社より出版されている『Go言語による並行処理』を読み勉強したことについて書いていきたいと思います。

Goにおける並行処理

GoはCPUのコア数が複数だった場合並列処理になります。 もし1コアのマシンで並列処理を行う場合それは並列処理ではなく素早く順に実行しているだけのようです。

並列処理と並行処理の違い

並列処理と並行処理の違いについては『Go言語による並行処理』では 『並列性はコードの性質を指し、並列性は動作しているプログラムの性質を指します』 とありますが分かりづらいので調べてみると

  • 並列処理 複数の命令の流れを同時に実行すること

  • 並行処理 コンピュータの単一の処理装置を複数の命令の流れで共有し、同時に実行状態に置くこと

このような違いがあるみたいです。イメージで説明すると以下のようになります

f:id:sugi1208:20200323174721p:plain

並列処理で気をつけること

デッドロック

  • プロセスなどの処理が互いの処理終了を待ち、どの処理も先に進めなくなってしまうこと

ゴールーチンで同じ変数へのデータ競合

  • 解決方法
    1つの変数には1つのゴールーチンからアクセスする
    ロックをとる
    チャネルを使う

チャネル型とは

チャネルはゴールーチン間でのメッセージを共有するためのもの

  • 特徴
    送受信時にブロックできる
    送信時にチャネルのバッファが一杯だとブロックする
    受信時にチャネル内が空だとブロックする

並列処理を使って大量のデータを書き込んでみる

今回は東京の気温データ(8785件)をMySqlに書き込んでいます。 また今回のコードに関しましては並列処理の例ということでfor文の中にてinsertしております。ご了承ください。

試しに自分で書いてみるコードがこちらです。

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "sync"

    "github.com/jinzhu/gorm"
    _ "github.com/jinzhu/gorm/dialects/mysql"
)

type Weather struct {
    Time    string `json:"time"`
    Temperature string `json:"temperature"`
}

func main() {

    client, _ := gorm.Open("mysql", "root:root@tcp([mysql]:3306)/weather?charset=utf8mb4&parseTime=true")

    client.AutoMigrate(&Weather{})

    ch := make(chan int, 4)
    wg := sync.WaitGroup{}

    account_json, err := ioutil.ReadFile("data.json")
    if err != nil {
        fmt.Println(err.Error())
    }
    var data []Weather
    if err := json.Unmarshal(account_json, &data); err != nil {
        log.Fatal(err)
    }
    for _, name := range data {
        ch <- 1
        wg.Add(1)
        go func(name Weather, client *gorm.DB) {
            defer wg.Done()
            client.Create(&name)
            <-ch
        }(name, client)
    }
    wg.Wait()
    client.Close()

}

変数dataに気象データを入れてfor文を使用し書き込みの処理をしています。

上記のコードの良くないと思う点

  • ただforで回して書き込むより少し早くしただけのコードになって安定したコードではない

  • 上記のコードには該当しないがデッドロックが起きた場合を考慮していない

  • insert処理が途中で止まってしまった場合処理が終了しない

上記の点を改善していきます

本を読んで書いてあったことを試してみる

次に本で読んでみて学んだ事を踏まえてコードを書いてみました。
今回はハートビートと言われる並列処理で用いられる手法を使用します。

ハートビートとは

ハートビートとはその名の通り人間の心拍のような物です。並列処理内部で外部にそのプロセスが生きているかを伝える方法です。

ハートビート使用した場合のメリット

今回の場合ハートビートは一定時間毎に外部にプロセスが生きているか伝えています。 ハートビートを使用することによって並列処理内部にて何しらの処理が止まってしまい外部に一定時間以上外部にプロセスが生きているか伝えられていない場合や、デッドロックしてしまった場合に処理を終了させることなどができます。

今回書いた全体のコードはこちらになります

こちらでは一部抜粋して説明していきます。

func main(){

    ----------省略----------

    done := make(chan interface{})

    defer close(done)

    hertbeat, results, errch := insert(done, data, client)

    <-hertbeat   //最初のハートビートがゴルーチンに入ったことを受け取ります

    i := 0

    for {  // ここはhertbeatの受け取りなどの処理
        select {
        case r, ok := <-results: //ここでは書き込んだ内容と書き込むはずの内容の比較をしています
            if ok == false {
                return
            } else if Weather := data[i]; Weather != r {
                log.Fatal("MismatchingError", Weather)
            }
            i++
        case <-errch:  
            log.Fatal(errch)
        case <-hertbeat:  //一定時間ごとにハートビートを受け取ります。
        case <-time.After(5 * time.Second):    //ここでは上記の処理が一定時間行われない場合にタイムアウトするようにいています。
            log.Fatal("Timeout")
        }
    }

}

func insert(done <-chan interface{}, data []Weather, client *gorm.DB) (<-chan interface{}, <-chan Weather, <-chan error) {
    hertbeat := make(chan interface{}, 1) 
    weatherch := make(chan Weather)
    errch := make(chan error)

    go func() {
        defer close(hertbeat)
        defer close(weatherch)

        beat := time.Tick(time.Second)

    Loop:
        for _, name := range data {
            for {

                select {
                case <-done:
                    return
                case <-beat:
                    select {
                    case hertbeat <- struct{}{}:  //ハートビートを一定時間ごとに送っています  
                default:
                    }
                case weatherch <- name:
                    client := client.Create(&name) //ここではDBに対する書き込みです
                    if client.Error != nil {
                        errch <- client.Error // ここではエラーハンドリングをしています。そのまま表示させてもよかったのですが今回はこのような形で書きました
                    }

                    continue Loop
                }
            }
        }
    }()

    return hertbeat, weatherch, errch    //   hertbeat, weatherch, errchのchanを返しています
}

メインの処理はinsert関数の中ではhertbeat等のチャネルを作成しinsertの処理をgo func() を使用し、main関数の下のforにてheatbeatを受け取る等の処理と並列に実行しています。

またコメントにもありますがheatbeatが一定時間受け取れていない場合selectの最後にタイムアウトするような処理を書くことによって処理を中断させることができます。

並列処理内部にて処理の停止、もしくはデッドロックが起きた場合の処理の比較

ここでは自分が書いたコードとハートビートを使用したコードが並列処理内部にて処理が止まってしまった場合にとのような動きをするかを比較していきます。

ここではデットロックや内部の処理の停止の再現としてtime.Sleep()を使用し60秒程の時間、処理を停止させます。

自分の書いたコードの場合

func main(){

    ----------省略----------
    for _, name := range data {
        ch <- 1
        wg.Add(1)
        go func(name Hoge, client *gorm.DB) {
            defer wg.Done()
            time.Sleep(60 * time.Second)//  この部分を追加し、処理を60秒停止させています
            client.Create(&name)
            <-ch
        }(name, client)
    }
    
    ----------省略----------

今回は60秒停止ということにしてありますがもし完全に停止してしまった場合(デッドロック等)、処理は終わらずにタイムアウトすることもありません。

ハートビートを使用したコードの場合

func main(){    
    ----------省略----------
    case weatherch <- name:
    time.Sleep(60 * time.Second)//  この部分追加し、処理を60秒停止させています
    client := client.Create(&name)
    if client.Error != nil {
        errch <- client.Error  
    }
  ----------省略----------

ハートビートが5秒間外部に送信されない場合以下のようにタイムアウトさせることができます。

2020/03/23 07:57:20 Timeout
exit status 1

このように並列処理内部にて何かしらのトラブルがあり、処理が停止した場合ハートビートを使用することによって処理を中断することができます。

最後に

Goの並列処理は少ししか触れていなかったので今回勉強して並行処理について深く知れました。『Go言語による並行処理』ではGoに限らず並行処理自体の勉強にもなるので難しいですがとても勉強になる本でした。まだ理解が浅い部分もあるので繰り返し読み勉強していきたいと思います。またこの場をお借りして勉強の機会を下さった会社の方々にお礼を申し上げたいと思います。ありがとうございました。