Datastream による Cloud SQL と BigQuery 同期:直面した課題と対応策

こんにちは、データ基盤担当の @mapler です。今回は DataStream を活用して、Cloud SQL から BigQuery へのデータ同期についてお話しします。

Datastream の設定は基本的に Google Cloud のドキュメント従って行えますので、この記事では、実際の設定時に直面した課題や対策を中心に説明します。

背景:

データを BigQuery に同期することで得られるメリット

JX通信社の「FASTALERT」は、日本国内外の緊急情報をリアルタイムで配信するサービスです。災害情報や事故、事件、気象警報など、幅広い分野の緊急情報を網羅しており、長年にわたって膨大な災害データを蓄積しています。

社内や顧客から、蓄積したデータへのアクセス需要が高まってきました。

BigQuery からデータを利用できるようにすることで、次のような利点があります。

  • 過去の特定の時期や地域で発生した事象を簡単に取得できるようになる
  • BigQuery でデータの推移や統計情報を視覚的に分析しやすくなる
  • 本番 DB にアクセスすることなく、負荷をかけずにデータを取得できる

課題:

これまで、社内のデータ基盤では CloudSQL から BigQuery への同期の仕組みを構築していましたが、リアルタイム同期ではなく、Cloud Composer (Airflow) を利用した Daily または Hourly のバッチタスクを使用していました。スケジュールタスクで Cloud SQL 連携クエリ によりデータを取得し、BigQuery に保存する方式です。しかし、この方法には以下の課題があります。

  • データの品質

    バッチ処理の実行タイミングや取得範囲設定によって、BigQuery と CloudSQL のデータに差分が生じます。

    • 過去の更新分が反映されない:たとえば取得範囲を「7日」と設定した場合、7日以上前のデータに更新があっても、それは BigQuery に反映されません。
    • また、バッチの実行間隔を Daily に設定すると、データの反映に最大1日の遅延が生じる可能性があります。
  • 実装コスト

    テーブルごとに ETL(データの抽出・変換・ロード)と転送パイプライン設定(DAG)を実装する必要があり、設定コストがかさみます。

Datastream

Datastream は、サーバーレスで使いやすい変更データ キャプチャ(CDC)およびレプリケーション サービスです。このサービスを利用することで、データを最小限のレイテンシで確実に同期できます。

— 公式ドキュメントにより (https://cloud.google.com/datastream/docs/overview?hl=ja)

同期元(今回は Cloud SQL for MySQL)のバイナリログを利用してデータの変更履歴を読み取り、ストリーミングの形で BigQuery へ同期を行います。

Datastream の構築方法については、こちらの公式ドキュメントを参照することで簡単に設定できますが、実際設定してみたとき、ドキュメントが不十分でわかりにくかった部分や、環境に依存して発生した課題について紹介します。

課題1 ネットワーク構成

これは、Datastream を利用する際によく直面する問題の一つです。公式ドキュメントや他の多くのブログでも、この課題について議論されています。

Datastream では、同期元のデータベースがパブリック IP アドレスからの接続を受け入れるように構成されている必要があります。しかし、FASTALERT の DB(Cloud SQL)はプライベートネットワーク内にあり、Datastream から直接読み取ることができません。

そのため、プライベートネットワーク内でリバースプロキシサーバを構成する必要がありました。このリバースプロキシを利用することで、Datastream がプライベートネットワーク内の Cloud SQL にアクセスできるようにしました。

ネットワーク構成:

引用元:https://cloud.google.com/datastream/docs/private-connectivity?hl=ja

Compute Engine で n1-standard-1 の VM インスタンスを立ち上げ、Cloud SQL へのアクセスが可能になるように、ファイアウォールのネットワーク設定も行いました。

さらに、VM の起動スクリプトとして、以下のようにフォワード設定を行いました。

#! /bin/bash

export DB_ADDR=[IP]
export DB_PORT=[PORT]

export ETH_NAME=$(ip -o link show | awk -F': ' '{print $2}' | grep -v lo)

export LOCAL_IP_ADDR=$(ip -4 addr show $ETH_NAME | grep -Po 'inet \K[\d.]+')

echo 1 > /proc/sys/net/ipv4/ip_forward
iptables -t nat -A PREROUTING -p tcp -m tcp --dport $DB_PORT -j DNAT \
--to-destination $DB_ADDR:$DB_PORT
iptables -t nat -A POSTROUTING -j SNAT --to-source $LOCAL_IP_ADDR

これにより、Datastream のネットワークが正常に通信できることを確認でき、プライベートネットワーク内の Cloud SQL へのアクセスが確立されました。

課題2 転送費用の読みが甘かった

Datastream では、Cloud SQL のスキーマを読み取り、転送したいテーブルを選択することができます。

テーブル名はマーキングさせていただきます

なるべく低コストで運用したいため、事前に行数が多いテーブルを転送対象から外しましたが、実際に運用してみると、想定よりも多い CDC(Change Data Capture)データの処理費用が発生していることがわかりました。

Datastream の OBJECTS 画面で転送実績を確認したところ、レコード数が少ないにも関わらず転送量が大きいテーブルがあることが判明しました。

このテーブルは変更が非常に頻繁で、大量の変更データが発生していたことが原因でした。

該当テーブルを転送対象から外した結果、費用は想定範囲内に抑えることができました。

課題3 同期テーブルのパーティション分割

BigQuery はテーブルのスキャンした分で課金されるため、テーブルを時間(日付)でパーティション分割するのは一般的です。時間(日付)パーティションを指定して、必要なデータだけをクエリすることで、クエリの効率が向上し、費用面でも節約が可能になります。

これまで、FASTALERT の災害記事テーブルは daily 単位で同期しており、日付でパーティション分割を行っていました。

しかし、今回利用する Datastream はテーブルのパーティション分割をサポートしていません。

こちらのドキュメント https://cloud.google.com/datastream/docs/best-practices-partitioned-tables?hl=ja のオプション1 に従ってパーティションを設定してみました。

手順としては、以下の通りです:

  1. Datastream に該当テーブルの同期を止め(転送処理を完了するまで待つ)
  2. Datastream から出力した BigQuery テーブルを複製し、パーティション分割テーブルとして作り直す
  3. Datastream の同期を再開

ただし、このドキュメントに記載されている方法には、漏れがありました。

Primary Key を設定すること

ドキュメントには Primary Key に関する記載がなかった(最新のドキュメントのオプション2には関連する記載があったが、オプション1の手順には記載なし)

BigQuery の CDC は、Primary Key が前提条件となっています。

上記の Step 3 で Datastream を再開したあと、転送先のテーブルに Primary Key が設定されていないとエラーが発生します。

CREATE TABLE dataset.partitioned_table (
'id' INT64,
'name' STRING
'update_date' DATETIME,
'datastream_metadata' STRUCT<'uuid' STRING, 'source_timestamp' INT64>,
PRIMARY KEY ('id') NOT ENFORCED
)
PARTITION BY TIMESTAMP(update_date)

上記の例のクエリのように Primary Key を定義するため、テーブルをもう一度作り直すことになりました。

max_staleness を設定すること

Primary Key と同様に、Datastream が自動生成したテーブルでは max_staleness が設定されていますが、手動で作り直したテーブルでは自前で設定しなければなりません。

max_staleness が設定されていない場合、デフォルト値の 0 が設定されます。この状態では、クエリを実行するたびに BigQuery は最新の結果を返すため、目標のパーティションだけでなく、Stream Buffer にあるデータも含めてスキャンされてしまいます。

これにより、パーティションが設定されていても、予想より多くのクエリスキャン量が発生します。

この設定に気づくまで、半日 max_staleness が 0 の状態で Datastream を走らせていたため、かなり無駄な費用が発生してしまいました。

まとめ

今回は、Google Cloudのデータ同期ツール「Datastream」を使用して、Cloud SQLからBigQueryへのデータ同期において実際に直面した課題を記述しました。

Datastreamは非常に便利であり、導入することで、従来はデータベースにアクセスする必要があったデータにBigQueryから簡単にアクセスできるようになりました。特に、リアルタイムでデータを同期できる点は大きな利点であり、これによりデータの可用性と分析効率が大幅に向上しました。

参考:

https://cloud.google.com/datastream/docs/private-connectivity?hl=ja https://cloud.google.com/bigquery/docs/change-data-capture?hl=ja#prerequisites https://cloud.google.com/bigquery/docs/change-data-capture?hl=ja#manage_table_staleness https://www.googlecloudcommunity.com/gc/Data-Analytics/Problem-with-partitioned-table-in-BigQuery-and-streaming-buffer/m-p/712332

copilot-instructions.mdは使えるぞ!実戦投入レポート

こんにちは、JX通信社でシニアエンジニアをしているSirosuzumeです。

先日、GitHub Copilotの新機能として、リポジトリのルートから.github/copilot-instructions.mdを読み込み、対話型UIの出力に反映する機能が追加されました。 この機能を使ってみて、どのような効果があるのか、どのように活用するのが良いのか、実際に新しく立ち上がったプロジェクトで試してみた結果を共有したいと思います。

copilot-instructions.mdの効果の検証

まずは実例を見てみましょう。

src/feature/entry-horse/presentational/list-item.tsxというファイルがあったとします。 この中にはEntryHorseListItemという名前のコンポーネントがあり、PropsはEntryHorseListItemPropsという名前で定義されているとします。 今回、このコンポーネントのUnitテストを作りたいため、src/feature/entry-horse/presentational/list-item.test.tsxというファイルを作成しました。

copilot-instructions.mdがない場合

楽をしようと考えて、GitHub Copilotの対話型UIを使用し「テストを作って」とお願いしてみたところ、以下のコードが生成されました。

import { render, screen } from '@testing-library/react';
import { describe, it, expect } from 'vitest';
import { EntryHorseListItem } from './list-item';

describe('EntryHorseListItem', () => {
  it('renders the name prop correctly', () => {
    render(<EntryHorseListItem name="Thunderbolt" />);
    expect(screen.getByText('Thunderbolt')).toBeInTheDocument();
  });

  it('renders another name prop correctly', () => {
    render(<EntryHorseListItem name="Lightning" />);
    expect(screen.getByText('Lightning')).toBeInTheDocument();
  });
});

しかし、私はこのコードを使う気にはなれませんでした。

  • 後だしになって申し訳ないのですが…実はこのプロジェクトはvitestではなくjestを使っています。
  • それに開発者は全員日本語話者のため、テストケースは日本語で書かれていてほしいのです。
  • あとtoBeInTheDocumenttoBeVisibleに変えてほしいです。

といった具合で、GitHub Copilotが生成したコードは、そのまま使えるということは殆どありません。

そもそもの話ですが、GitHub CopilotはあくまでAIであり、プロジェクトの背景情報を知りません。 開発者が全員日本人であるとか、jestを使っているとか、そんな情報をGitHub Copilotは持っていないのです。 それを教えるための手段として、copilot-instructions.mdを読むという機能が追加されました。

では次にcopilot-instructions.mdに、上の情報を追加して出力してもらいましょう。

copilot-instructions.mdがある場合

copilot-instructions.mdには以下の内容を記載します。

# コード規約

このプロジェクトはtest runnerに`jest`を使っています。
開発者は日本語話者のため、テストケースは日本語で書いてください。
テストはdescribeでグループ化し、itでテストケースを書いてください。
`toBeInTheDocument`より`toBeVisible`を優先して使ってください。

そして、GitHub Copilotに「テストを作って」とお願いしてみたところ、以下のようなコードが生成されました。

import React from 'react';
import { render, screen } from '@testing-library/react';
import { EntryHorseListItem } from './list-item';

describe('EntryHorseListItemコンポーネント', () => {
  it('名前が表示されることを確認する', () => {
    render(<EntryHorseListItem name="サンプルホース" />);
    const nameElement = screen.getByText('サンプルホース');
    expect(nameElement).toBeVisible();
  });

  it('空の名前が表示されることを確認する', () => {
    render(<EntryHorseListItem name="" />);
    const nameElement = screen.getByText('');
    expect(nameElement).toBeVisible();
  });
});

きちんとjestを使っていて、テストケースも日本語で書かれていて、toBeInTheDocumenttoBeVisibleに変更されています。 細かい点ではもっとこうしてほしい、というところはありますが、都度copilot-instructions.mdに追記していくことで、かなりプロジェクトに合わせたコードを生成してくれるようになります。

2024-11時点での課題

copilot-instructions.mdを100行以上書き込み、人間が読んでも役に立つくらいのノウハウ集にすると、対話型UIが出力してくれるコードはかなり精度が高くなります。

しかし、この機能にはまだ大きな弱点があります。サジェストの出力には未対応だということです。

おそらく、コードを書くのに慣れている人ほど、対話型のUIは使わず、サジェストをちょっと賢い予測変換機能として使っていることが多いのではないでしょうか。 私もどちらかといえばそのタイプで、これまでほとんど対話型UIは使用していません。 ものは試しにと、一日ほど対話型UIをつかってコーディング作業をしてみましたが、やはりサジェストを活用したほうが早くコーディングできるという結論に至ってしまいました。

ただ、使用しているうちに、対話型UIがサジェストよりも勝っていると感じられた点を見つけました。

それはゼロからコードを出力する場合です。

サジェスト機能は、現在開いているファイルと、他に開いているタブの内容を元に、カーソル位置に入るコードを推測し出力しています。 質の高いサジェストを出力させるには、importするパッケージやファイルを先に指定したり、お手本となるファイルを別のタブに開いたりするなど、小ワザを活用する必要もあります。

一方、copilot-instructions.mdを使用して出力したコードは、importするパッケージを正確に推測することができたり、現在のフォルダと命名規則から思った通りの関数名を出すことができます。

この特性は、HygenTurborepのコードジェネレーション機能のテンプレートを凝って保守するよりも、楽でかつ柔軟な運用ができると感じました。

コードジェネレーターとしてcopilot-instructions.mdを運用する

既存のコードジェネレーターに抱えていた課題

私は普段、Reactのコンポーネントや、それに付随するテストファイルを作成するときに、Hygenを活用することが多いです。 言語を問わずに使えることや、テンプレートのカスタマイズ性の高さが魅力ですが、テンプレートの保守に課題を感じていました。 生成されたコードがエラーをなるべく吐かないようにしたり、命名規則を一定にしたり、条件によって分岐したり。 出力に使用する.ejs.tファイルも、可読性が良いとは言えず、IDEの支援も受けにくいです。 便利にしようとするほど、入力しなければいけない項目も増えていくため、メンテしている本人以外は使いこなすのが難しいという問題も出てきます。

copilot-instructions.mdをコードジェネレーターとみなす場合

copilot-instructions.mdは自然言語で書くことができるため、可読性の面では問題ありません。 条件分岐もAI側にある程度お任せすることができます。

コードジェネレーターとしてCopilotを活用する場合、この3つの要素が活用しやすいと考えます。

  • 現在のフォルダ、ファイル名
    • 生成するコードの役割を推測させるのに役に立つ
  • copilot-instructions.md
    • コード規約やノウハウ集に近いものになる
  • 生成時にユーザーが入力する文章
    • 細かいオーダーがある場合に使用する。なるべくこの要素の比率を少なくする

出力するファイル、使用する機能に応じてセクショニングする

copilot-instructions.mdはMarkdownの形式で記載することができます。 全ての内容を並列に記載するのではなく、セクションごとに分けて記載することで、Copilotがどの部分に対してどのようなコードを生成すべきかを判断しやすくなります。

  • fetch関連のコードのルール
  • コンポーネントのテストコードのルール
  • モックデータの生成関数のルール

といった具合に、こういうコードを書くときはここを参考にしてくださいと例示します。

プロジェクトのフォルダ構成から関数名、ファイル名を推測できるようにする

フォルダのやファイル、関数の命名規則を統一し、その内容をcopilot-instructions.mdに記載しておくと、Copilotが精度の高い関数名やimportを考えてくれるようになります。

例えばEntryHorseというドメインがある場合

## フォルダ構成

あるドメインに所属するコードは`src/features/{{ ドメイン名(ケバブケース )}}`の下に格納します。
Mockデータの生成関数は`src/feature/entry-horse/mock.ts`に格納されており、`generateMock`というprefixで始まるMockデータ生成関数が存在しています。
コンポーネントは`src/feature/entry-horse/components`に格納されています。
コンポーネントは基本的に`{{ ドメイン名 }}{{ Role }}`の形式で命名されており、Propsは`{{ ドメイン名 }}{{ Role }}Props`という形式で命名されています。

のような情報をcopilot-instructions.mdに記載した上でsrc/feature/entry-horse/components/list.tsx内で「コードを書いてください」と指示すると 以下のようなコードが出力されます。

import type React from "react";

type EntryHorseListProps = {};

const EntryHorseList: React.FC<EntryHorseListProps> = (props) => {
  return <div>{/* Render your component here */}</div>;
};

export default EntryHorseList;

上手く生成されたときの例を記載する

プロンプトエンジニアリングなどでもある手法ですが、具体的な例をMarkdownに提示してあげると、より精度の高いコードを生成します。

細かくフィードバックをして更新する

生成されたファイルが期待通りではない場合、随時copilot-instructions.mdに追記していくことで、Copilotが生成するコードの精度を向上させることができます。

ファイルやフォルダ自体の作成には、既存のコードジェネレーターを利用する

GitHub Copilotはファイルやフォルダ自体を生成することはできません。 決められたフォルダ構成やファイル名でファイルを生成するためには、既存のコードジェネレーターを使うことが有効です。 併用することで、コードの初期開発のスピードが上がります。

生成の命令時に指示を追加して使う

この点が既存のコードジェネレーターには特にないメリットだと感じています。 copilot-instructions.mdに書ききれない、少し例外的な関数が必要だとしても、生成時に注意点としてその旨を追記しておくことで柔軟な対応が可能です。

例えばこのコンポーネントはforwardRefを使う必要があるという場合、「forwardRefを使ってください」という指示にするだけで、対応が可能になります。

まとめ

サジェストの出力に未対応である点が解決されれば、この機能の価値は相当に高まるように思えます。 またcopilot-instructions.mdを育てていくことは、人間の開発者にとっても副次的な効果があるのではないかと予想しています。 自然言語で書くことができ、積極的に更新していくモチベーションにもなるため、開発者間にとっても有益なノウハウ集や、実効性のあるコード規約集として活用できる可能性があるのではないかと感じています。

AWSとGoogle Cloudのコスト最適化の道 〜データドリブンな取り組みの紹介〜

CTO の小笠原(@yamitzky)です。今日は、CTO として推進している「サーバー費削減プロジェクト」の取り組みについてご紹介します。

本稿では「リザーブドインスタンスを購入する」や「入札型のインスタンスに移行する」といった一般的な削減テクニックについては扱いません。プロジェクトとしてどう分析、進行し、成果を出しているか、という話を中心に、取り組みをまとめています。

背景

JX通信社では、Amazon Web Services(以下、AWS) や Google Cloud などのクラウドサービスを活用しています。これらのクラウドサービスは通常、ドルで費用が決まっており、日本円で支払います。そのため、為替の影響を受けてしまいます。

ちょうど最近は円高の恩恵を受けていますが、つい3年前の2021年は1ドル103円だったところ、2024年のピーク時には160円まで進行しています。つまり原価が1.5倍近く上がってしまっていることになります。

Google Finance のドル円チャート

サーバーコスト削減のための開発は、直接的な売上増や、顧客へ提供する価値の向上には繋がらないものです。ついつい後回しになりがちではありましたが、為替などの背景もあり、2022年ごろから大規模なサーバーコスト削減に断続的に取り組む形になりました。

サーバーコスト削減施策に取り組むための、3つの基本

サーバーコスト削減を成功に導くコツとしては、3つあると考えています。これらを順を追ってご紹介します。

  1. プロジェクトとして立ち上げる
  2. 「行動につなげやすいコミュニケーション」を意識する
  3. データドリブンなアプローチを取る

1. プロジェクトとして立ち上げる

企業においてなんらかの取り組みを成功させるには「プロジェクト」を立ち上げるのが良いと思います。プロジェクトの要素として、次の点を抑えると良いです。

  • 「プロジェクト名」を決める
  • 「時期」「ゴール」「リソース」を決める
  • どう実現するか、施策の優先順位の方針を決める
  • モニタリングと振り返りを行う

2022年に取り組んだ最初の「サーバー費削減プロジェクト」は 「2023年3月までに費用を30%削る」というゴールを設定 し、と銘打ったりしました。

プロジェクト用のNotionページ

そして 「月に10万円以上の削減効果があるものを優先する」「削除するだけで終わるものを優先する」 といった方針を設けたり、「一ヶ月以上工数がかかるものはやらない」「放っておけば減りそうなものはやらない」といった優先度付けをしたり、「機能削減だけでできるものを優先し、施策の責任者と調整する」「数日でできるものはプロダクトバックログに入れてもらう」といった交渉などを行いました。

2. 「行動につなげやすいコミュニケーション」を意識する

例えば 「Amazon S3 のデータが高いのでなんとかしてください!」 と伝えても、「どれくらいの重要度なのか」「どれくらいの大変さなのか」「なぜそれをやらないといけないのか」などはわからず、納得感のあるコミュニケーションにならないですし、行動につなげることもできません。

そこで、次のようなコミュニケーションを意識・徹底しています。

  • コストや削減幅を伝えるときは、単位を「一ヶ月あたり◯万円」に揃える *1
  • 「何にかかっているコストなのか」「どんな施策や売上に紐づいたコストなのか」などを調べ、伝える
  • 削減の難易度についての考えを述べる

例えば、冒頭の例を言い換えると、 「開発版のS3バケットに、月20万円もかかっています。開発版なので、3ヶ月以上古いデータを自動削除する設定をするだけで、月2万円程度までコストが下げられるはずです」 などという具合に伝えます。そうすると「開発版だから確かにもったいないな」「開発版だから古いの消すのは合理的だな」「消すだけなら簡単だな」と、関係者が納得感を持って理解し、行動しやすくなります

詳細はほぼお見せできないのですが、施策や削減手段ごとにかかっているコストなどをまとめて管理しています

3. データドリブンなアプローチ

サーバーコスト削減の成果を出すために、 定量的に分析してなるべく効果の高いものを見つけ、その結果を日次でモニタリング するようにしています。分析とモニタリングにわけてご説明します。

分析フェーズ

まず、AWS や Google Cloud のすべてのコストを、BigQuery に転送しています。そのデータを、Connected Sheets を使って Google Sheets に連携しています。 さらに、一個一個の細目に対して、「何の機能にかかっているコストなのか」を目視でアノテーションしています *2

Connected Sheet の例。一個一個の細目に対して、プロダクトの機能や、コストの目的をアノテーションしています。

そして Google Sheets 上に集約したものを、以下のような分析軸でピボットテーブルにかけます。

  • クラウドのアカウント・プロジェクトIDごと
  • クラウドの製品ごと (Lambda, DynamoDB, Cloud Run, etc...)
  • 利用タイプ・SKUごと (Lambda の「GB-Second-ARM」、Cloud Run の「CPU Allocation Time」といった単位。このとき、リージョンは分かれないようにまとめます)
  • 機能・施策ごと (自社プロダクトにおける「◯◯機能」や、「セキュリティ監査のため」などの用途)
  • 事業ごと

このように分析を進めると、 削減幅の大きい対象や、ムダに感じられる費用、費用対効果の見合わないプロダクトの機能、社内システムetc...などが浮かび上がってきます。「ムダな費用かもしれなくて確認が必要だが、削減幅の大きくないもの」の優先度を落とすこともできます。

また、AWS の Cost Explorer を使った分析をされている方も多いと思います。私も、簡易的な用途としてはよく利用しますが、クラウド横断での分析ができないこと、分析の集計軸(ピボットテーブルできる区分)が限定的で意味のある集計になりづらいこと、定期的なモニタリングがしづらいことなどから、BigQuery や Google Sheets をベースにした分析をおすすめしたいです。

モニタリングフェーズ

BigQuery に集約したクラウドのコストを、Redash で定時集計し、毎朝 Slack に投稿するようにしています。Redash への投稿は主に私の作った bot を使っています*3

毎日だと変動が大きく削減できたかわかりづらいこともあるので、週次集計や、月次予測での過去の◯月比、といった比較も定期的に行っています。また、AWS、Google Cloud 以外については、稟議申請のタイミングでの費用チェック等も地道にやっています。

全体像。構築時期がかなり古いため冗長ですが、S3→GCS→BigQueryの転送などはもっとシンプルにできます。AWS のコストデータはクラスメソッドの仕組みで保存されています。

まとめ

今回は「サーバー費のコスト削減」というテーマについて、具体的なテクニックではなく、データドリブンな取り組みやプロジェクト管理にフォーカスを当ててご紹介しました。削減テクニックとしては、AWSの公式ブログ やその他の技術ブログも参考にしましたが、削減幅が大きくないためにJX通信社ではやっていない施策も多々あります。定量的に分析をしてから取り組む、というのが大事ではないでしょうか。

また、サーバー費削減が進んでいるのは、ひとえに社内関係者のご協力があってのことです。この場を借りて、御礼を申し上げます。

*1:「月◯万円」という単位で目標や売上、あるいは自分の給料を見ることが多いので、このような単位にしています

*2:アノテーションしていない費用は、全体の1%程度です。金額が大きいものは厳密に確認しつつ、ある程度ルールベースでのアノテーションもして、えいやで付与しています

*3:hakobera さんの素晴らしいアイデアをフォークしていますが、コードはほぼ書き換わっています

気象庁XMLを正しく扱いたい!

テーマの紹介

JX通信社エンジニアのr_uematsuです。
弊社は、日本テレビ放送網株式会社と共同で「日テレ気象・防災サイト」を開発しています。気象警報、地震・津波情報、噴火情報など、防災に関わる情報をまとめて閲覧できるサイトです。 bosai.news.ntv.co.jp

情報源には気象庁から配信されるXML(電文)を使用しています。
気象庁XMLは気象情報や地震情報など様々な情報を配信しており、日テレ防災サイト以外にも社内プロジェクトでも広く利用されています。
今回は気象庁XMLの紹介と正しく扱うためには、どんなことに気を付けるべきかを地震津波関連のXMLを例に掘り下げてみたいと思います。

これから気象庁XMLを使ってみたい方に雰囲気が伝わると幸いです! また掘り下げる内容は、自分自身が気象庁の地震津波関連のXMLに初めて触れた時に、把握が難しかった仕様や重要なポイントなど取り上げてみました。地震津波関連のXMLを既に使ってる方の助けになればと思います。

気象庁防災情報XMLについて

気象庁防災情報XMLとは、気象庁が発表する気象警報や地震津波情報、火山情報などをITサービスに取り入れたい時に便利なデータです。公式情報がXML形式で配信されていてPULL型で取得することができます。
例えば気象警報について以下のようなXMLが配信されます。

<Report xmlns="http://xml.kishou.go.jp/jmaxml1/" xmlns:jmx="http://xml.kishou.go.jp/jmaxml1/" xmlns:jmx_add="http://xml.kishou.go.jp/jmaxml1/addition1/">
<Control>
<Title>気象特別警報・警報・注意報</Title>
<DateTime>2024-07-22T16:11:07Z</DateTime>
<Status>通常</Status>
<EditorialOffice>気象庁本庁</EditorialOffice>
<PublishingOffice>気象庁</PublishingOffice>
</Control>
<Head xmlns="http://xml.kishou.go.jp/jmaxml1/informationBasis1/">
<Title>東京都気象警報・注意報</Title>
<ReportDateTime>2024-07-23T01:11:00+09:00</ReportDateTime>
<TargetDateTime>2024-07-23T01:11:00+09:00</TargetDateTime>
<EventID/>
<InfoType>発表</InfoType>
<Serial/>
<InfoKind>気象警報・注意報</InfoKind>
<InfoKindVersion>1.1_2</InfoKindVersion>
<Headline>
<Text>小笠原諸島では、23日夕方まで急な強い雨や落雷に注意してください。</Text>
<Information type="気象警報・注意報(府県予報区等)">
<Item>
<Kind>
<Name>雷注意報</Name>
<Code>14</Code>
</Kind>
<Areas codeType="気象情報/府県予報区・細分区域等">
<Area>
<Name>東京都</Name>
<Code>130000</Code>
</Area>
</Areas>
</Item>
</Information>
~~~~省略~~~~

<Item>
<Kind>
<Name>雷注意報</Name>
<Code>14</Code>
</Kind>
<Areas codeType="気象・地震・火山情報/市町村等">
<Area>
<Name>千代田区</Name>
<Code>1310100</Code>
</Area>
</Areas>
</Item>
<Item>

~~~~省略~~~~
</Body>
</Report>

配信される情報は数十種類にも及び、それぞれにXMLフォーマットと仕様が存在します。

いざ気象庁XMLを導入しよう!と開発を進めると、このフォーマットと仕様の把握がとても大変でした。。。

地震津波関連を例にXMLの仕様を覗いてみる

XMLの仕様は例えばどんなものかというのを弊社でよく扱う地震津波関連のXMLを例に覗いてみたいと思います。

気象庁が配信する地震津波関連のXMLだけでも種類はこんなにあります。

  • 津波警報・注意報・予報
  • 津波情報
  • 沖合の津波観測に関する情報
  • 緊急地震速報
  • 震度速報
  • 地震情報(震源に関する情報)
  • 地震情報(震源・震度に関する情報)
  • 地震情報(地震の活動状況等に関する情報)
  • 地震情報(地震回数に関する情報)
  • 地震情報(顕著な地震の震源要素更新のお知らせ)
  • 長周期地震動に関する観測情報
  • 南海トラフ地震に関連する情報
  • 地震・津波に関するお知らせ

それぞれに個別の仕様とXMLのフォーマットが存在します。さらに発表条件と順番があります。
参考:地震情報について
参考:津波警報・注意報、津波情報、津波予報について

発令とEventIDについて

気象庁のWebページ地震情報についてによると1回の地震が発生した場合に複数のXMLが配信される可能性があることがわかります。 その地震が震度3以上なのか、津波に関する情報はあるのかなどの条件によりそれぞれのXMLの配信の有無が決まります。

よく使用する種別を簡単に紹介します。

  • 津波警報・注意報・予報
    津波に関する警報の発令の有無に関する情報が載ってます。

  • 津波情報
    津波の到達予想時刻や波の高さなどの情報が載ってます。

  • 震度速報
    震度3以上の揺れを観測した場合に全国各地の地震の揺れを速報として配信されます。速報のため震度観測区域は「東京都23区」のように荒めになります。

  • 震源に関する情報
    津波警報または注意報が出ていない場合に配信されます。地震の発生場所(震源)やその規模(マグニチュード)の情報が載ってます。

  • 震源・震度に関する情報
    震源に関する情報の内容に加えて、震度速報に比べてさらに細かい区域の「東京千代田区」のような単位での観測震度の情報が載ってます。
    参考:緊急地震速報や震度情報で用いる区域の名称

EventIDに関して

地震津波関連XMLでは、ある特定の地震を識別するために地震識別番号(14 桁の数字例:20240101210208)がXMLの<EventID>で与えられます。

<Report xmlns="http://xml.kishou.go.jp/jmaxml1/" xmlns:jmx="http://xml.kishou.go.jp/jmaxml1/">
<Control>
<Title>震度速報</Title>
<DateTime>2024-01-01T07:07:40Z</DateTime>
<Status>通常</Status>
<EditorialOffice>気象庁本庁</EditorialOffice>
<PublishingOffice>気象庁</PublishingOffice>
</Control>
<Head xmlns="http://xml.kishou.go.jp/jmaxml1/informationBasis1/">
<Title>震度速報</Title>
<ReportDateTime>2024-01-01T16:07:00+09:00</ReportDateTime>
<TargetDateTime>2024-01-01T16:06:00+09:00</TargetDateTime>
<EventID>20240101160608</EventID>  <---こちら
<InfoType>発表</InfoType>
<Serial/>
<InfoKind>震度速報</InfoKind>
<InfoKindVersion>1.0_1</InfoKindVersion>
<Headline>
<Text> 1日16時06分ころ、地震による強い揺れを感じました。震度3以上が観測された地域をお知らせします。</Text>
<Information type="震度速報">
~~~~省略~~~~
</Report>

地震には前震、本震、余震とありますが、一般的に震源地や発生時刻が異なるため別々の識別番号(EventID)が与えられます。異なる種別のXMLでEventIDが同じ場合は同一の地震に関するXMLと解読することができます。

XML種別 EventID 説明
震源・震度に関する情報 20240101xxxxx1 前震
震度速報 20240101xxxxx2 本震
震源・震度に関する情報 20240101xxxxx2 本震
震源・震度に関する情報 20240101xxxxx3 余震
津波警報・注意報・予報 20240101xxxxx2 本震によって発令
津波情報 20240101xxxxx2,
20240101xxxxx3
本震,余震によって起きた津波の情報

具体的に以上のようにXMLが配信された場合、以下のように解読できます。

  • 前震、本震、余震があった。
  • 本震では震度速報が配信され震度3以上である。
  • 本震の揺れにより津波警報・注意報・予報が発令された。
  • 本震、余震によって引き起こされた津波がありそう。

取消報について

地震が発生すると気象庁からの公式情報が次々と流れてきますが、ごく稀に誤った情報が配信される場合があります。そのような場合、取消電文というものが配信されます。実際に、2024/01/01に石川県能登で震度7を観測した内容のXMLが誤って配信されました。TVニュースなどでもそのまま発表されて後に訂正されていた記憶があります。まさにあの時、取消報が配信されていました。

実際に配信された取消電文

<Report xmlns="http://xml.kishou.go.jp/jmaxml1/" xmlns:jmx="http://xml.kishou.go.jp/jmaxml1/">
<Control>
<Title>震度速報</Title>
<DateTime>2024-01-01T14:13:46Z</DateTime>
<Status>通常</Status>
<EditorialOffice>気象庁本庁</EditorialOffice>
<PublishingOffice>気象庁</PublishingOffice>
</Control>
<Head xmlns="http://xml.kishou.go.jp/jmaxml1/informationBasis1/">
<Title>震度速報</Title>
<ReportDateTime>2024-01-01T23:13:00+09:00</ReportDateTime>
<TargetDateTime>2024-01-01T23:03:00+09:00</TargetDateTime>
<EventID>20240101230402</EventID>
<InfoType>取消</InfoType>
<Serial/>
<InfoKind>震度速報</InfoKind>
<InfoKindVersion>1.0_1</InfoKindVersion>
<Headline>
<Text>震度速報を取り消します。</Text>
</Headline>
</Head>
<Body xmlns="http://xml.kishou.go.jp/jmaxml1/body/seismology1/" xmlns:jmx_eb="http://xml.kishou.go.jp/jmaxml1/elementBasis1/">
<Text>先ほどの、震度速報を取り消します。</Text>
</Body>
</Report>

この電文は種別「震度速報」のEventIDが「20240101230402」の電文を撤回することを意味します。 DBなどに地震情報を保存していたりする場合何かしらのロールバック処理が必要になると思います。(場合によっては結構厄介ですね。。。)
地震津波関連のXMLを扱うシステムは取消電文を受け取る可能性があることも考慮しておきたいですね。

終わりに

最後までお読みいただき、ありがとうございます。気象庁XMLにはどんな仕様があるかを地震津波関連のXMLを掘り下げてみました。また今回は紹介していない気象、火山、台風などでも地震津波のように固有事情、仕様が存在します。
気象庁から公式情報が配信されてますが、正しく扱うには仕様の深い理解が必要です。防災関連のシステムで利用した場合、重要な場面で想定外の挙動を起こさないよう安定に動作するように心掛けたいですね。 
今回掘り下げた地震津波関連では発令順やEventID、取消報以外にも気を付けるべき点がいくつかあり、さらに気象や火山のXMLを扱う場合はそれぞれの仕様の把握が必要です。弊社は、気象庁XMLをより扱いやすいフォーマットに加工、整理して返却するAPIを開発と提供をしています。災害情報を活用する機会がありましたらぜひお問い合わせください! jxpress.net

Playwrightでメール配信のテスト自動化にチャレンジ!

こんにちは、JX通信社でシニアエンジニアをしているSirosuzumeです。

JX通信社の「FASTALERT」には、ユーザーが事前に設定した地域で発生した災害情報を、メールで受信する機能があります。

しかしテストする手順も複雑で、配信条件も多様化していったこともあって、手動でのテストを行うことに限界を感じていました。

設定画面の挙動確認など、ブラウザ上で完結するテストであればPlaywrightを使って自動化することもできていたのですが、実際にメールを受信するところのテストを自動化する方法についてのノウハウ不足が課題でした。

そこで、Amazon SESの機能を改めて確認していたところ、特定のメールアドレスで受信したメールをS3に保存する機能があることを知り、E2Eテスト内からS3にアクセスすることでメールの受信テストまで自動化でカバーできるのではないか、と考えたことが、今回のチャレンジのきっかけです。

この記事ではAmazonSES + S3を経由して、Playwright上からメール受信テストを行なう方法を解説します。

概要

今回作成したPlaywrightのテストは、以下のシーケンス図のような流れで動作させました。

E2Eテストシーケンス図

Amazon SESでを使用して、メールを受信するための設定方法は公式ドキュメントを参照してください。

以降のドキュメントはルール設定時にアクションの追加で、Amazon S3バケットにメールを保存するようにしていることが前提になります。

S3経由でメールの受信を検知する方法(Node環境の場合)

S3にアクセスする必要があるため、AWS SDKを使用します。

S3に保存されたメールを取得するだけならば、ListObjectsV2Commandを使ってファイル一覧を取得し、GetObjectCommandを使ってファイルの内容を取得し、解析することで可能です。

特定のアクションをトリガーとして新しいメールが来るのを検知したい場合は、ポーリングを行って新しいファイルが追加されるのを検知する必要があります。

また、メール配信のトリガーとなるアクション自体も時間がかかる場合があるため、以下のような順番で新しいメールが来るのを待つ処理を実装してみました。

  1. 受信前のリストを取得する
  2. メール配信のトリガーとなるアクションを実行する
  3. ポーリングを一定時間行ない、受信前のリストと比較して、新しいファイルが追加されたらそのファイルのオブジェクトキーを返却する
  4. オブジェクトキーを使ってファイル(メール)をダウンロードする

以下はポーリングを行なう関数、S3からメールを取得する関数の実装例です。

import { setInterval } from "node:timers/promises";
import {
  DeleteObjectsCommand,
  GetObjectCommand,
  ListObjectsV2Command,
  S3Client,
} from "@aws-sdk/client-s3";

const client = new S3Client({
  region: process.env.E2E_MAIL_S3_BUCKET_REGION,
});

export async function readNewEmailFromS3(): Promise<string[]> {
  // WARN: S3に1000件以上のメールが溜まっている場合、1000件を超えるメールが来た場合に対応できない
  const command = new ListObjectsV2Command({
    Bucket: process.env.E2E_MAIL_S3_BUCKET_NAME,
  });
  const response = await client.send(command);
  const fileNames =
    response.Contents?.map((content) => content.Key ?? "") ?? [];
  return fileNames;
}

/**
 * 一定時間メールが来なかった場合に発生するエラー。逆にメールが来ないことを検知する際は、このエラーが発生することを期待する
 */
export class WatchTimeoutError extends Error {
  constructor() {
    super("Timeout");
  }
}

export async function watchNewEmailsUntilTimeout(
  actionPromise: Promise<void>,
  timeout: number,
  interval = 500,
): Promise<string[]> {
  const lastEmails = await readNewEmailFromS3();
  await actionPromise;
  // Promise完了からtimeoutまでの間、新しいメールが来るのを待つ
  for await (const startTime of setInterval(interval, Date.now())) {
    if (Date.now() - startTime > timeout) {
      break;
    }
    const emails = await readNewEmailFromS3();
    if (emails.length > lastEmails.length) {
      return emails.filter((email) => !lastEmails?.includes(email));
    }
  }
  throw new WatchTimeoutError();
}
/**
 * オブジェクトキーを指定してメールを文字列の形式で取得する
 */
export async function fetchEmailByKey(key: string): Promise<string> {
  const command = new GetObjectCommand({
    Bucket: process.env.E2E_MAIL_S3_BUCKET_NAME ?? "",
    Key: key,
  });
  const response = await client.send(command);
  if (!response.Body) {
    throw new Error("No body");
  }
  return response.Body.transformToString();
}

メールの内容を取得し、解析できる形式に変換する

S3に保存されたEmailはBase64でエンコードされたRawデータが保存されているため、一度デコードしてメールの内容を取得する必要がありました。

Node環境でメールをデコードする場合、mailparserを使うと、メールの内容を解析してオブジェクトとして取得できます。

以下はmailparserを使って、1件のメール受信を待つ関数の実装例です。

import { ParsedMail, simpleParser } from "mailparser";

export async function waitNewMail(
  actionPromise: Promise<any>,
  timeout: number,
): Promise<ParsedMail> {
  const newEmailKeys = await watchNewEmailsUntilTimeout(actionPromise, timeout);
  if (!newEmailKeys.length) {
    throw new Error("No new email");
  }
  if (newEmailKeys.length > 1) {
    throw new Error("Too many new emails");
  }
  const newEmail = await fetchEmailByKey(newEmailKeys[0] as string);
  const parsedEmail = await simpleParser(newEmail);
  return parsedEmail;
}

テスト例

上記の関数を使って、メールが来ることを検知するテストと、メールが来ないことを検知するテストを実装してみました。

メールが来ることをテストする場合は、一定時間内にメールが来ることを検知する関数を使って、メールの内容を検証します。 メールが来ないことをテストする場合は、逆に一定時間内にメールが送信されず、WatchTimeoutErrorが発生することを期待します。

import { expect, test } from "@playwright/test";

test.beforeEach(async ({ page }) => {
  // メールの受信設定を行なう
  await page.goto("https://example.com/mail-setting");
  // SESで設定したメールアドレスを入力する
  await page.getByRole("textbox", { name: "メールアドレス" }).type(process.env.E2E_MAIL_ADDRESS);
  await page.click("button", { text: "設定" });
});
test("メールが10秒以内に来ることを検知する", async ({ page }) => {
  // メールが来ることを検知する
  const mail = await waitNewMail(actionToTriggerMail(), 10000);
  await expect(mail.subject).toBe("メールの件名");
  // page.setContentを使うと、HTMLメールをブラウザ上で表示しているかのようにテストできる。ただしメーラーによる差分までは検知できない
  await page.setContent(mail.html);
  await expect(page.getByText("メールの内容")).toBeTruthy();
});
test("メールが来ないことを検知する", async () => {
  // メールが来ないことを検知する
  await expect(
    waitNewMail(actionToTriggerMailButNoMail(), 10000),
  ).rejects.toThrowError(WatchTimeoutError);
});

HTMLメールのメーラーごとの差分を検知するのはPlaywrightでは困難ですが、コーディング時にReact Emailを使用するとHTMLメールの開発を楽にすることができます。

詳しくは、実践 React Emailを使ったHTMLメールの開発・運用という記事を書いたので、もしよろしければご参考にしてください。

まとめ

メールの受信テストを自動化したことで、新機能開発、リファクタリング時のリグレッションテストが大幅に効率化することができました。 手動では1時間以上かかってしまうテストが数分で完了し、リグレッションテストも毎回網羅的に行えるようになりました。

テスト用のコードそのものが複雑になりがちではありますが、それを補ってあまりある効果があったと感じています。

今後、FASTALERTではメールによる各種情報の配信を増やしていく予定のため、より多くのテストを自動化し、品質を担保していきたいと考えています。