browser-use を使って情シス業務を自動化するための実践的テクニック

こんにちは、CTOの小笠原(@yamitzky)です。

最近、LLMを使ってブラウザ操作を自動化する、browser-use が流行っていますね! 今回は、毎月実施している情シスタスクの一つをテーマに、browser-use で業務自動化できるかを検証してみました。

browser-use とは

browser-use は、AIエージェントを使ってブラウザ操作を自動化するツールです。ブラウザ操作には内部的には playwright を使い、AI 部分は langchain を使っています。また、タスクを遂行するためのエージェントとしての仕組み(タスクを分解して、ネクストアクションを決め、ゴールが達成できているかを評価する)も備わっています。

そのため、ざっくりと「◯◯◯のサイトを開いて、◯◯◯の予約をして」というと、ブラウザを動かしてゴールを達成してくれます。

browser-use.com

ただし、「業務を自動化する」となるといくつかハイコンテキストな部分があり、雑に指示するだけでは難しいな、という印象です。

自動化したい情シスタスクの概要

情シス部門では毎月、「アカウントの棚卸し」という業務を行っています。これは、各種SaaSサービスにログインし、アカウント一覧をCSVなどでエクスポートし、退職済みの方がいないか、使われていないライセンスがないか、などを確認する業務です。セキュリティインシデントの防止や、コスト削減のために、単純ではあるものの重要な業務です。これを解決する SaaS などもありますが、それほど時間がかかっていないというのもあり、現在は手動で行っています。

今回は、Zoomを対象に「アカウント一覧をCSVなどでエクスポート」までを自動化してみます。

Step 1:公式ドキュメントに従って browser-use を使ってみる

import asyncio

from browser_use import Agent, Controller
from browser_use.browser.browser import Browser, BrowserConfig
from langchain_openai import ChatOpenAI

browser = Browser(
    config=BrowserConfig(
        headless=False,
        disable_security=False,
    )
)
controller = Controller()
model = ChatOpenAI(model="gpt-4o")


async def main():
    task = "Zoom のユーザー一覧ページを開き、エクスポート → テーブル内のすべてのユーザー からダウンロードをしてください"
    agent = Agent(
        task=task, llm=model, use_vision=True, controller=controller, browser=browser
    )
    await agent.run()
    await browser.close()


if __name__ == "__main__":
    asyncio.run(main())

公式ドキュメントに従えば、上記のようなコードで動きそう... ですが、実際にはうまくいきません。業務知識として「ZoomのID/PASSWORD」が必要なので、エージェントにはログインできないのです。放置していると、下記スクリーンショットのように、存在しないアカウントでログインを試みます。

Zoomのログインに失敗する様子(3倍速)

Step 2:ログインを実装する

さすがに LLM の API に ID/PASSWORD を送りたくないので、人間がログインをするようにしましょう。また、具体的にどの URL を開くのか指定した方が、結果が安定します。

# ...略...

browser = Browser(
    config=BrowserConfig(
        chrome_instance_path="/Applications/Google Chrome Canary.app/Contents/MacOS/Google Chrome Canary",
    )
)

# ...略...

@controller.action("ユーザーにログインを依頼する")
def ask_login():
    # Macユーザー限定
    subprocess.run(["say", "-v", "Kyoko", "ログインしてください"])
    input("ログインしてください。ログインが完了したら、Enterキーを押してください")
    return ActionResult(
        extracted_content="ログインが完了しました", include_in_memory=True
    )


async def main():
    async with await browser.new_context() as context:
        task = "https://zoom.us/profile を開き、ログイン済みかを確かめてください。ログインが完了していない場合は、ユーザーにログインを依頼(ask_login)してください。"
        agent = Agent(
            task=task,
            llm=model,
            use_vision=True,
            controller=controller,
            browser_context=context,
        )
        await agent.run()

        task = "https://zoom.us/account/user#/ を開き、エクスポート → テーブル内のすべてのユーザー からダウンロードをしてください"
        agent = Agent(
            task=task,
            llm=model,
            use_vision=True,
            controller=controller,
            browser_context=context,
        )
        await agent.run()

    await browser.close()

一部を省略しましたが、 @controller.action("ユーザーにログインを依頼する") で tool を登録してあげると、人間に操作を依頼できるようになります。tool 名はプロンプト内で具体的に指定した方が、期待通りに動作をしてくれます。

加えて、次の3つのテクニックを追加しました。こちらは必須というよりは好みの問題です。

  • 複数の Agent に分割している
    • AI になるべく単純なタスクを与えるため。実際には、単一のAgent、単一のプロンプトでもうまくいくことが多いです。
  • Playwright の Headless Chrome ではなく、実際のブラウザを使う(上記例ではChrome Canaryを指定)
    • ログイン情報(セッション)が保存されたり、Chrome 拡張をインストールしておくことがやりやすいです。
  • say コマンドで音声を読み上げる(Macユーザー限定)
    • バックグラウンドで AI に操作させておいて、人間の対応が必要なときに気づけるようになります。

上記の対応で、ログインまではうまくいくようになります...が、実際にはエクスポートしたファイルのダウンロードができていません。

Step3:ダウンロードに対応

Step2では一見するとダウンロードが成功したように見えるのですが、Playwright の仕様でダウンロード済みのファイルにアクセスすることができません。関連した Issue も立っており、回避策が提案されています。

https://github.com/browser-use/browser-use/issues/91

事前に download イベントに対応する処理を記載する必要があります。

# ...略...

def get_or_create_download_dir() -> Path:
    exe_dir = Path(os.path.dirname(os.path.abspath(__file__)))
    download_dir = exe_dir / "downloads"
    download_dir.mkdir(exist_ok=True, parents=True)
    return download_dir

async def handle_download(download: Download):
    original_path = await download.path()
    # ファイル名からダウンロード元サイトがわかるよう、example_com 形式でドメインを追加
    domain_prefix = download.url.split("/")[2].replace(".", "_")
    new_filename = f"{domain_prefix}_{download.suggested_filename}"
    new_path = get_or_create_download_dir() / new_filename
    os.rename(original_path, new_path)

async def handle_new_page(page: PlaywrightPage):
    page.on("download", handle_download)

async def setup_download(playwright_browser: PlaywrightBrowser):
    while len(playwright_browser.contexts) < 1:
        await asyncio.sleep(1)
        print("waiting for contexts to be created")
    for context in playwright_browser.contexts:
        context.on("page", handle_new_page)
        for page in context.pages:
            page.on("download", handle_download)

async def main():
    async with await browser.new_context() as context:
        pw_browser = await context.browser.get_playwright_browser()
        asyncio.create_task(setup_download(pw_browser))

        # ...略...

これで、基本的には問題なく動くと思います!

browser-useでエクスポートが成功している例(3倍速)

Step4:AI に処理の完了をちゃんと確認させる(おまけ)

今回の Zoom のエクスポートはかなり単純な例でした。他の SaaS では、エクスポートが非同期で行われることも多いです。複雑な処理の場合、Agent は誤って「ダウンロードが完了した」と誤解してしまうことがあります。

そこで、Agent にダウンロード済みファイルの確認をさせます。

# ...略...

@controller.action("ダウンロード済みファイル一覧を確認する")
def get_downloaded_files():
    download_dir = get_or_create_download_dir()
    files = list(download_dir.glob("*"))
    if not files:
        return ActionResult(
            extracted_content="ダウンロードフォルダに現在ファイルはありません。",
            include_in_memory=True,
        )

    file_list = "\n".join([f"- {file.name}" for file in files])
    return ActionResult(
        extracted_content=f"ダウンロードフォルダの内容:\n{file_list}",
        include_in_memory=True,
    )


async def main():
    async with await browser.new_context() as context:
        pw_browser = await context.browser.get_playwright_browser()
        asyncio.create_task(setup_download(pw_browser))

        task = "https://zoom.us/profile を開き、ログイン済みかを確かめてください。ログインが完了していない場合は、ユーザーにログインを依頼(ask_login)してください。"
        agent = Agent(
            task=task,
            llm=model,
            use_vision=True,
            controller=controller,
            browser_context=context,
        )
        await agent.run()

        task = "https://zoom.us/account/user#/ を開き、エクスポート → テーブル内のすべてのユーザー からダウンロードをしてください。作業を完了する前に、ダウンロードフォルダのファイル一覧(get_downloaded_files)を確認し、ダウンロードができているかを確認してください。ファイル名にはダウンロード元サイトのドメインが含まれています。"
        agent = Agent(
            task=task,
            llm=model,
            use_vision=True,
            controller=controller,
            browser_context=context,
        )
        await agent.run()

# ...略...

「ダウンロード済みファイル一覧を確認する」という tool を作成し、それによって作業の完了を確認させるようにしました。

実際、意地悪をして get_downloaded_files 関数が常に「ダウンロードフォルダに現在ファイルはありません。」を返すようにすると、延々とダウンロード処理を繰り返します。

振り返り

今回は「Zoom のユーザー一覧をエクスポートする」という作業をテーマに、browser-use での自動化をしてみました。

実際には他にも情シス業務の自動化を検証してみていますが、ハードルはやや高いかなという印象です。例えば、Google Sheets (スプレッドシート) のような複雑な UI の操作はあまりうまくいかない印象で、tool を用意してあげたり、タスクを分解してあげる、人間のレビューを挟むなど、「AI に期待しすぎない」というのがコツかなと思いました。

一方で、「AIでもわかるようにドキュメントや手順を簡略化する」という検討も進むので、良い機会にはなりました!

ぜひ皆さんも業務自動化をチャレンジしてみてください!

MSWを活用したフロントエンドIntegrationテストのノウハウ

こんにちは!JX通信社のシニアエンジニアのSirosuzumeです。

通信を含むコンポーネントのテストや、Storybookの動作を確認する際、皆さんはどんなアプローチをしていますか? 私はMSWを使用して、通信処理をMockしてテストを行っています。 MSWを導入する以前は、通信をMockするためにコンポーネントのPropsでfetcherを渡すという方法をとっていました。 実際、この方法は特別なライブラリを必要とせず、シンプルでわかりやすい方法ですが、コンポーネントの親子関係が複雑になると、Propsのバケツリレーが発生しがちです。 MSWはnode環境、あるいはbrowser環境で通信をMockするためのライブラリであり、上記のようなPropsによる制御が不要になります。 JestやVitestといったテスト環境だけではなく、Storybookや開発環境でも使用することができるため、実際に動かしてUXを確認する際にも便利です。

この記事では、MSWの活用方法のうち、Node環境で実行するIntegrationテストの書き方について紹介します。

準備

テスト環境におけるMSWのライフサイクルを把握する

Node環境下でMSWを動かす場合、msw/browserではなくmsw/nodeを使い、setupServer関数を使用して通信のMock設定を行う必要があります。

基本の流れとしては以下のようになります。

  1. setupServer関数を使ってMockサーバーのインスタンスを作成する
  2. listenメソッドを使ってMockサーバーを有効化する(基本的にbeforeAllで呼び出す)
  3. 必要に応じてuseメソッドを使用して、ハンドラーを追加し、テストを実行する
  4. afterEachでresetHandlersメソッドを呼び出して、3で追加したハンドラーを無効化する
  5. afterAllでcloseメソッドを呼び出してMockサーバーを無効化する

公式ドキュメントではjest.setup.tsなどでグローバルのタイミングでセットアップを行うことを推奨しています。 しかし、私達のプロジェクトではMSWを使わないUnitテストのファイルも多いこと、Mock箇所は必要最小限に抑えたいという方針であるため、MSWの使用はテストファイルごとに行う方針でテストを記述しています。 この方針でMSWを使用する場合、以下のようなスニペットをリポジトリに追加しておくと便利です。

const server = setupMockServer(/* TODO: デフォルトのハンドラーを追加 */);
beforeAll(() => {
  server.listen();
});
afterEach(()=> {
  server.resetHandlers();
});
afterAll(() => {
  server.close();
});

予想外のリクエストをキャッチするように設定する

MSWのhandlerは第一引数に指定された文字列、正規表現およびメソッドにマッチしたリクエストをMockし、マッチしなかったものは通常通りの通信を行います。 これはブラウザでの確認時、まだデプロイされていないエンドポイントの一部だけをMockするといった用途には適した動作なのですが、テスト実行時に外部に向けて予期せぬリクエストが飛んでしまう可能性があります。 私達のプロジェクトでは、下記の例のようにsetupServerをラップした関数を用意し、キャッチできなかった全てのリクエストを404にしてしまうハンドラーを末尾に追加しています。(GraphQLのリクエストもPostリクエストであるため、このハンドラーでキャッチできます) テスト時はmsw/nodeから直接setupServerをimportせず、この関数(setupMockServer)を使うようにすることで安全性を高めることができます。

import { type SetupServerApi, setupServer } from "msw/node";

export function setupMockServer(
  ...handlers: Array<RequestHandler>
): SetupServerApi {
  return setupServer(
    ...handlers,
    http.all(
      "*",
      () =>
        new HttpResponse(null, {
          status: 404,
          statusText: "not found",
        }),
    ),
  );
}
// 自分の環境だけかもしれないが、setupServerを自動でimportしようとすると
// msw/nodeではなくmsw/lib/nodeが優先してimportされて、テストが失敗してしまうことがある。
// 副作用的だが、その問題への対処にもなっている

Integrationテストの書き方

フロントエンドでのテストでは、よくTesting Trophyという考え方が取り上げられます。大雑把に言うと、Integrationテストを最も厚くし、UnitテストやE2Eテストの数は、Integrationテストよりも少なくなっているのが理想的という指針です。 この指針は意識的にUnitテストを減らしたり、E2Eテストを減らすといったやり方で行うべきではありません。 MSWを活用してIntegrationテストを書いていくと、自然とこの指針に従ったテストができあがっていきます。

サーバーからのレスポンスにより変更した要素を確かめる

通信を含むコンポーネントのIntegrationテストで、最も基本となる形は「レスポンスに応じて要素が変化することの検証」になります。 コンポーネントがレンダリングされた後、非同期の処理を挟んで要素が変化する場合、要素の変化を待機する必要があります。 testing-libraryではこうした非同期処理や、useEffectによる要素の変更に対応するために、waitFor、findBy〜といったメソッドが用意されています。 findByは内部的にwaitForを利用しているUtilityな立ち位置の関数で、記述もこちらのほうが直感的でわかりやすいので、要素の変更が発生する場合は、なるべくfindByを使うようにしましょう。

例として、検索欄に入力された情報を元にユーザー一覧を表示するコンポーネントのテストを考えてみます。

it("検索したユーザーが表示される", async () => {
  const user = userEvent.setup();
  const handler = http.get("/users", (req) => {
    return HttpResponse.json({ items: [generateMockUser({ name: "JX太郎" }] });
  });
  server.use(handler);
  render(<UserList />);
  const searchInput = screen.getByRole("textbox", { name: "検索欄" });
  await user.type("JX太郎");
  const button = screen.getByRole("button", { name: "送信" });
  await user.click(button);
  const userName = await screen.findByText("JX太郎");
  expect(userName).toBeVisible();
});

上記の例は、まさしくハッピーパスといった感じのテストケースですが、通信を伴うコンポーネントは、多くの場合以下のような状態を持っています。

  • 初期状態
  • ローディング中
  • データが存在する
  • データが存在しない
  • エラー発生時

これらのテストケースをカバーすると、Integrationテストは必然的にテストコード全体の中でも多くの割合を占めることになり、Testing Trophyの指針に従ったテストが書きやすくなります。 特に通信エラー時の表示などの異常系のテストは、手動やE2Eなどのブラウザー環境で実行するには何かと特殊な操作やMockが必要となるため難しくスキップされ、結果的に「想定通りに動いていなかった」といった事態が起こりやすいです。 MSWをつかったIntegrationテストでは、これらのテストをハッピーパスのテストと同程度の難易度で書くことがきます。

// 例 UserListコンポーネントのテスト
it("検索欄が空", () => {
  render(<UserList />);
  const searchInput = screen.getByRole("textbox", { name: "検索欄" });
  expect(searchInput).toHaveValue("");
});

async function setupWithSearch(handler: RequestHandler) {
    const user = userEvent.setup();
  server.use(handler);
  render(<UserList />);
  const searchInput = screen.getByRole("textbox", { name: "検索欄" });
  const button = screen.getByRole("button", { name: "送信" });
  await user.type("JX太郎");
  await user.click(button);
}

it("ローディング中の表示がされる", async () => {
  const handler = http.get("/users", (req) => {
    // 無限にローディング中の状態を続ける
    await delay('infinite')
    return HttpResponse.json({ items: [generateMockUser({ name: "JX太郎" }] });
  });
  await setup(handler);
  const loading = await screen.findByText("now loading...", { exact: true });
  expect(loading).toBeVisible();
});

it("データが存在する", () => {
  // 省略
})
it("検索結果が0件"), () => {
  // 省略
})
it("通信エラー時", () => {
  // 省略
})

サーバーに送信したリクエストを検査する

コンポーネントのアウトプットというと、第一にHTML要素が考えられますが、サーバーに送信されるリクエストもアウトプットの一種とみなすことができます。 フロントエンジニアであれば誰もが「入力内容に対して、期待通りのAPIリクエストが送信されておらず、バグが発生していた」と言ったバグを生み出した経験があると思います。 MSWのハンドラー内にコールバックの関数を設定しておくと、サーバー側に送信されたリクエストを検証することができます。

it("検索欄に入力した内容がsearchParamsに反映される", async () => {
  const user = userEvent.setup();
  const handleSearchParams = jest.fn()
  const handler = http.get("/users", (req) => {
    // レスポンスを返す前にコールバックを呼ぶ
    handleSearchParams(new URL(req.request.url).searchParams);
    return HttpResponse.json({ items: [generateMockUser({ name: "JX太郎" }] });
  });
  server.use(handler);
  render(<UserList />);
  const searchInput = screen.getByRole("textbox", { name: "検索欄" });
  const button = screen.getByRole("button", { name: "送信" });
  await user.type("JX太郎");
  await user.click(button);
  await waitFor(() => expect(handleSearchParams).toBeCalledTimes(1));
  expect(handleSearchParams).toBeCalledWith({ name: "JX太郎" });
});

MSWを使ったテストを書きやすくする環境作り

Integrationテストの数が多くなるぶん、テストコード自体の書きやすさや可読性、保守性等も重要になります。 MSWを使ったIntegrationテストでは、リクエストハンドラーの生成、レスポンスデータの生成が頻繁に発生します。

Mock用のデータ生成関数を用意する

MSWを使ったテストに限った話ではありませんが、サーバーからのレスポンスデータのMockを生成する関数を作成しておくと、テストコードの作成に取り組むときに非常に便利です。 私達のチームではAPI、レスポンスの型がきまったとき、モックデータの生成関数の作成も必須のタスクとしています。 以下は、ユーザー情報を生成する関数の例です。

// ファイル名はuser.mock.tsなど、通常のコードと区別できるファイル名にすると、lintの設定等で、mock.tsのimportを禁止するなどの対策を行うことができます
import type { User } from "./user"

// generateMock{モデル名}など、Mockデータ生成関数の命名規則は統一する
export function generateMockUser(override: Partial<User> = {}): User {
  return {
    id: "1",
    name: "JX太郎",
    age: 20,
    ...override,
  };
}

ハンドラーの生成を簡単にする

通信を伴うコンポーネントのIntegrationテストを行う際、以下のようなテストケースが頻出することが多いです。

  • 正常系
    • レスポンスに対して想定通りの要素が表示される
    • コンポーネントを操作したとき、サーバーに想定通りのリクエストが送信される
      • SearchParamsが想定通りか
      • PathParamsが想定通りか
      • RequestBodyが想定通りか
  • 異常系
    • サーバーから既知のエラーが返ってきたとき、コンポーネントが想定通りの動作をする
    • サーバーから未知のエラーが返ってきたとき、コンポーネントが想定通りの動作をする

fetchやaxiosなどを使った通信処理をコーディングする場合、たいていエンドポイントやメソッドを関数内に隠蔽した、関数を作成するのではないかと思います。 MSWハンドラーを作成するときも同じです。 エンドポイントとメソッドは固定で設定し、レスポンスだけを差し替えたハンドラーを作るのが便利です。

type Props = {
  // Propsの型パズルを頑張れば、invalidなときだけ任意の型を設定するなどできる
  response: JsonBodyType;
  status?: number;
  statusText?: string;
  onPathParams?: (params: unknown) => void;
  onRequestBody?: (body: unknown) => void;
  onRequestSearchParams?: (searchParams: URLSearchParams) => void;
}

function buildMockGetUsersMswHandler(props: Props) {
  return http.get("*/users", ({ req }) => {
      props.onRequestSearchParams?.(new URL(req.request.url).searchParams);
      props.onPathParams?.(req.params);
      props.onRequestBody?.(await req.request.json());
      return HttpResponse.json(props.response, {
        status: props.status ?? 200,
        statusText: props.statusText ?? "ok",
      });
  })
}

RESTApiであれば、ほとんどのテストケースで使用されるハンドラーは、上述の例のうち「エンドポイント」と「メソッド」だけの違いになります。もう一段階カリー化すれば、毎回ハンドラーを作成する手間が省けそうです。 以下の例は、エンドポイントとメソッドを引数に取り、REST API用のMSWハンドラーを返す関数を作成する例です。

type MswHttpHandlerBuilderProps = {
  response: JsonBodyType;
  status?: number;
  statusText?: string;
  onPathParams?: (params: unknown) => void;
  onRequestBody?: (body: unknown) => void;
  onRequestSearchParams?: (searchParams: URLSearchParams) => void;
};

type BuildMswHttpHandlerBuilderProps = {
  path: Path;
  method: keyof typeof http;
};
export function buildMswHttpHandlerBuilder({
  path,
  method,
}: BuildMswHttpHandlerBuilderProps) {
  return (props: MswHttpHandlerBuilderProps): HttpHandler =>
    http[method](path, async (req) => {
      props.onRequestSearchParams?.(new URL(req.request.url).searchParams);
      props.onPathParams?.(req.params);
      props.onRequestBody?.(await req.request.json());
      return HttpResponse.json(props.response, {
        status: props.status ?? 200,
        statusText: props.statusText ?? "ok",
      });
    });
}

// ハンドラー生成用関数を作成
export const buildGetUsersMswHandler =
  buildMswHttpHandlerBuilder({
    path: "*/users",
    method: "get",
  });

// テスト時に以下のようなハンドラー随時作成する
const handler = buildGetUsersMswHandler({
  response: { items: [generateMockUser({ name: "JX太郎" })] },
});

まとめ

人間が使いやすいUI/UXを実現しようとするほど、コンポーネントは複雑化していまう運命にあると感じています。 複雑なコンポーネントをテストするには、相応に複雑なMockやStubが必要になり、コンポーネントをただ書くことよりも難易度が高くなります。 そのためIntegrationテストはE2EテストやUnitテストよりも難易度が高く、どうしても省略したい、回避したい、という感情が生まれてしまいがちです。 昨年、新しいメンバーがプロジェクトに加わった際、Integrationテストのノウハウを、言語化して伝える機会があり、自分でも「ああ、こう書けばいいんだ!」という発見が多くありました。 この記事が参考になった!MSWを活用してIntegrationテストを書き始めた!という人が一人でもいれば幸いです。

Datastream による Cloud SQL と BigQuery 同期:直面した課題と対応策

こんにちは、データ基盤担当の @mapler です。今回は DataStream を活用して、Cloud SQL から BigQuery へのデータ同期についてお話しします。

Datastream の設定は基本的に Google Cloud のドキュメント従って行えますので、この記事では、実際の設定時に直面した課題や対策を中心に説明します。

背景:

データを BigQuery に同期することで得られるメリット

JX通信社の「FASTALERT」は、日本国内外の緊急情報をリアルタイムで配信するサービスです。災害情報や事故、事件、気象警報など、幅広い分野の緊急情報を網羅しており、長年にわたって膨大な災害データを蓄積しています。

社内や顧客から、蓄積したデータへのアクセス需要が高まってきました。

BigQuery からデータを利用できるようにすることで、次のような利点があります。

  • 過去の特定の時期や地域で発生した事象を簡単に取得できるようになる
  • BigQuery でデータの推移や統計情報を視覚的に分析しやすくなる
  • 本番 DB にアクセスすることなく、負荷をかけずにデータを取得できる

課題:

これまで、社内のデータ基盤では CloudSQL から BigQuery への同期の仕組みを構築していましたが、リアルタイム同期ではなく、Cloud Composer (Airflow) を利用した Daily または Hourly のバッチタスクを使用していました。スケジュールタスクで Cloud SQL 連携クエリ によりデータを取得し、BigQuery に保存する方式です。しかし、この方法には以下の課題があります。

  • データの品質

    バッチ処理の実行タイミングや取得範囲設定によって、BigQuery と CloudSQL のデータに差分が生じます。

    • 過去の更新分が反映されない:たとえば取得範囲を「7日」と設定した場合、7日以上前のデータに更新があっても、それは BigQuery に反映されません。
    • また、バッチの実行間隔を Daily に設定すると、データの反映に最大1日の遅延が生じる可能性があります。
  • 実装コスト

    テーブルごとに ETL(データの抽出・変換・ロード)と転送パイプライン設定(DAG)を実装する必要があり、設定コストがかさみます。

Datastream

Datastream は、サーバーレスで使いやすい変更データ キャプチャ(CDC)およびレプリケーション サービスです。このサービスを利用することで、データを最小限のレイテンシで確実に同期できます。

— 公式ドキュメントにより (https://cloud.google.com/datastream/docs/overview?hl=ja)

同期元(今回は Cloud SQL for MySQL)のバイナリログを利用してデータの変更履歴を読み取り、ストリーミングの形で BigQuery へ同期を行います。

Datastream の構築方法については、こちらの公式ドキュメントを参照することで簡単に設定できますが、実際設定してみたとき、ドキュメントが不十分でわかりにくかった部分や、環境に依存して発生した課題について紹介します。

課題1 ネットワーク構成

これは、Datastream を利用する際によく直面する問題の一つです。公式ドキュメントや他の多くのブログでも、この課題について議論されています。

Datastream では、同期元のデータベースがパブリック IP アドレスからの接続を受け入れるように構成されている必要があります。しかし、FASTALERT の DB(Cloud SQL)はプライベートネットワーク内にあり、Datastream から直接読み取ることができません。

そのため、プライベートネットワーク内でリバースプロキシサーバを構成する必要がありました。このリバースプロキシを利用することで、Datastream がプライベートネットワーク内の Cloud SQL にアクセスできるようにしました。

ネットワーク構成:

引用元:https://cloud.google.com/datastream/docs/private-connectivity?hl=ja

Compute Engine で n1-standard-1 の VM インスタンスを立ち上げ、Cloud SQL へのアクセスが可能になるように、ファイアウォールのネットワーク設定も行いました。

さらに、VM の起動スクリプトとして、以下のようにフォワード設定を行いました。

#! /bin/bash

export DB_ADDR=[IP]
export DB_PORT=[PORT]

export ETH_NAME=$(ip -o link show | awk -F': ' '{print $2}' | grep -v lo)

export LOCAL_IP_ADDR=$(ip -4 addr show $ETH_NAME | grep -Po 'inet \K[\d.]+')

echo 1 > /proc/sys/net/ipv4/ip_forward
iptables -t nat -A PREROUTING -p tcp -m tcp --dport $DB_PORT -j DNAT \
--to-destination $DB_ADDR:$DB_PORT
iptables -t nat -A POSTROUTING -j SNAT --to-source $LOCAL_IP_ADDR

これにより、Datastream のネットワークが正常に通信できることを確認でき、プライベートネットワーク内の Cloud SQL へのアクセスが確立されました。

課題2 転送費用の読みが甘かった

Datastream では、Cloud SQL のスキーマを読み取り、転送したいテーブルを選択することができます。

テーブル名はマーキングさせていただきます

なるべく低コストで運用したいため、事前に行数が多いテーブルを転送対象から外しましたが、実際に運用してみると、想定よりも多い CDC(Change Data Capture)データの処理費用が発生していることがわかりました。

Datastream の OBJECTS 画面で転送実績を確認したところ、レコード数が少ないにも関わらず転送量が大きいテーブルがあることが判明しました。

このテーブルは変更が非常に頻繁で、大量の変更データが発生していたことが原因でした。

該当テーブルを転送対象から外した結果、費用は想定範囲内に抑えることができました。

課題3 同期テーブルのパーティション分割

BigQuery はテーブルのスキャンした分で課金されるため、テーブルを時間(日付)でパーティション分割するのは一般的です。時間(日付)パーティションを指定して、必要なデータだけをクエリすることで、クエリの効率が向上し、費用面でも節約が可能になります。

これまで、FASTALERT の災害記事テーブルは daily 単位で同期しており、日付でパーティション分割を行っていました。

しかし、今回利用する Datastream はテーブルのパーティション分割をサポートしていません。

こちらのドキュメント https://cloud.google.com/datastream/docs/best-practices-partitioned-tables?hl=ja のオプション1 に従ってパーティションを設定してみました。

手順としては、以下の通りです:

  1. Datastream に該当テーブルの同期を止め(転送処理を完了するまで待つ)
  2. Datastream から出力した BigQuery テーブルを複製し、パーティション分割テーブルとして作り直す
  3. Datastream の同期を再開

ただし、このドキュメントに記載されている方法には、漏れがありました。

Primary Key を設定すること

ドキュメントには Primary Key に関する記載がなかった(最新のドキュメントのオプション2には関連する記載があったが、オプション1の手順には記載なし)

BigQuery の CDC は、Primary Key が前提条件となっています。

上記の Step 3 で Datastream を再開したあと、転送先のテーブルに Primary Key が設定されていないとエラーが発生します。

CREATE TABLE dataset.partitioned_table (
'id' INT64,
'name' STRING
'update_date' DATETIME,
'datastream_metadata' STRUCT<'uuid' STRING, 'source_timestamp' INT64>,
PRIMARY KEY ('id') NOT ENFORCED
)
PARTITION BY TIMESTAMP(update_date)

上記の例のクエリのように Primary Key を定義するため、テーブルをもう一度作り直すことになりました。

max_staleness を設定すること

Primary Key と同様に、Datastream が自動生成したテーブルでは max_staleness が設定されていますが、手動で作り直したテーブルでは自前で設定しなければなりません。

max_staleness が設定されていない場合、デフォルト値の 0 が設定されます。この状態では、クエリを実行するたびに BigQuery は最新の結果を返すため、目標のパーティションだけでなく、Stream Buffer にあるデータも含めてスキャンされてしまいます。

これにより、パーティションが設定されていても、予想より多くのクエリスキャン量が発生します。

この設定に気づくまで、半日 max_staleness が 0 の状態で Datastream を走らせていたため、かなり無駄な費用が発生してしまいました。

まとめ

今回は、Google Cloudのデータ同期ツール「Datastream」を使用して、Cloud SQLからBigQueryへのデータ同期において実際に直面した課題を記述しました。

Datastreamは非常に便利であり、導入することで、従来はデータベースにアクセスする必要があったデータにBigQueryから簡単にアクセスできるようになりました。特に、リアルタイムでデータを同期できる点は大きな利点であり、これによりデータの可用性と分析効率が大幅に向上しました。

参考:

https://cloud.google.com/datastream/docs/private-connectivity?hl=ja https://cloud.google.com/bigquery/docs/change-data-capture?hl=ja#prerequisites https://cloud.google.com/bigquery/docs/change-data-capture?hl=ja#manage_table_staleness https://www.googlecloudcommunity.com/gc/Data-Analytics/Problem-with-partitioned-table-in-BigQuery-and-streaming-buffer/m-p/712332

copilot-instructions.mdは使えるぞ!実戦投入レポート

こんにちは、JX通信社でシニアエンジニアをしているSirosuzumeです。

先日、GitHub Copilotの新機能として、リポジトリのルートから.github/copilot-instructions.mdを読み込み、対話型UIの出力に反映する機能が追加されました。 この機能を使ってみて、どのような効果があるのか、どのように活用するのが良いのか、実際に新しく立ち上がったプロジェクトで試してみた結果を共有したいと思います。

copilot-instructions.mdの効果の検証

まずは実例を見てみましょう。

src/feature/entry-horse/presentational/list-item.tsxというファイルがあったとします。 この中にはEntryHorseListItemという名前のコンポーネントがあり、PropsはEntryHorseListItemPropsという名前で定義されているとします。 今回、このコンポーネントのUnitテストを作りたいため、src/feature/entry-horse/presentational/list-item.test.tsxというファイルを作成しました。

copilot-instructions.mdがない場合

楽をしようと考えて、GitHub Copilotの対話型UIを使用し「テストを作って」とお願いしてみたところ、以下のコードが生成されました。

import { render, screen } from '@testing-library/react';
import { describe, it, expect } from 'vitest';
import { EntryHorseListItem } from './list-item';

describe('EntryHorseListItem', () => {
  it('renders the name prop correctly', () => {
    render(<EntryHorseListItem name="Thunderbolt" />);
    expect(screen.getByText('Thunderbolt')).toBeInTheDocument();
  });

  it('renders another name prop correctly', () => {
    render(<EntryHorseListItem name="Lightning" />);
    expect(screen.getByText('Lightning')).toBeInTheDocument();
  });
});

しかし、私はこのコードを使う気にはなれませんでした。

  • 後だしになって申し訳ないのですが…実はこのプロジェクトはvitestではなくjestを使っています。
  • それに開発者は全員日本語話者のため、テストケースは日本語で書かれていてほしいのです。
  • あとtoBeInTheDocumenttoBeVisibleに変えてほしいです。

といった具合で、GitHub Copilotが生成したコードは、そのまま使えるということは殆どありません。

そもそもの話ですが、GitHub CopilotはあくまでAIであり、プロジェクトの背景情報を知りません。 開発者が全員日本人であるとか、jestを使っているとか、そんな情報をGitHub Copilotは持っていないのです。 それを教えるための手段として、copilot-instructions.mdを読むという機能が追加されました。

では次にcopilot-instructions.mdに、上の情報を追加して出力してもらいましょう。

copilot-instructions.mdがある場合

copilot-instructions.mdには以下の内容を記載します。

# コード規約

このプロジェクトはtest runnerに`jest`を使っています。
開発者は日本語話者のため、テストケースは日本語で書いてください。
テストはdescribeでグループ化し、itでテストケースを書いてください。
`toBeInTheDocument`より`toBeVisible`を優先して使ってください。

そして、GitHub Copilotに「テストを作って」とお願いしてみたところ、以下のようなコードが生成されました。

import React from 'react';
import { render, screen } from '@testing-library/react';
import { EntryHorseListItem } from './list-item';

describe('EntryHorseListItemコンポーネント', () => {
  it('名前が表示されることを確認する', () => {
    render(<EntryHorseListItem name="サンプルホース" />);
    const nameElement = screen.getByText('サンプルホース');
    expect(nameElement).toBeVisible();
  });

  it('空の名前が表示されることを確認する', () => {
    render(<EntryHorseListItem name="" />);
    const nameElement = screen.getByText('');
    expect(nameElement).toBeVisible();
  });
});

きちんとjestを使っていて、テストケースも日本語で書かれていて、toBeInTheDocumenttoBeVisibleに変更されています。 細かい点ではもっとこうしてほしい、というところはありますが、都度copilot-instructions.mdに追記していくことで、かなりプロジェクトに合わせたコードを生成してくれるようになります。

2024-11時点での課題

copilot-instructions.mdを100行以上書き込み、人間が読んでも役に立つくらいのノウハウ集にすると、対話型UIが出力してくれるコードはかなり精度が高くなります。

しかし、この機能にはまだ大きな弱点があります。サジェストの出力には未対応だということです。

おそらく、コードを書くのに慣れている人ほど、対話型のUIは使わず、サジェストをちょっと賢い予測変換機能として使っていることが多いのではないでしょうか。 私もどちらかといえばそのタイプで、これまでほとんど対話型UIは使用していません。 ものは試しにと、一日ほど対話型UIをつかってコーディング作業をしてみましたが、やはりサジェストを活用したほうが早くコーディングできるという結論に至ってしまいました。

ただ、使用しているうちに、対話型UIがサジェストよりも勝っていると感じられた点を見つけました。

それはゼロからコードを出力する場合です。

サジェスト機能は、現在開いているファイルと、他に開いているタブの内容を元に、カーソル位置に入るコードを推測し出力しています。 質の高いサジェストを出力させるには、importするパッケージやファイルを先に指定したり、お手本となるファイルを別のタブに開いたりするなど、小ワザを活用する必要もあります。

一方、copilot-instructions.mdを使用して出力したコードは、importするパッケージを正確に推測することができたり、現在のフォルダと命名規則から思った通りの関数名を出すことができます。

この特性は、HygenTurborepのコードジェネレーション機能のテンプレートを凝って保守するよりも、楽でかつ柔軟な運用ができると感じました。

コードジェネレーターとしてcopilot-instructions.mdを運用する

既存のコードジェネレーターに抱えていた課題

私は普段、Reactのコンポーネントや、それに付随するテストファイルを作成するときに、Hygenを活用することが多いです。 言語を問わずに使えることや、テンプレートのカスタマイズ性の高さが魅力ですが、テンプレートの保守に課題を感じていました。 生成されたコードがエラーをなるべく吐かないようにしたり、命名規則を一定にしたり、条件によって分岐したり。 出力に使用する.ejs.tファイルも、可読性が良いとは言えず、IDEの支援も受けにくいです。 便利にしようとするほど、入力しなければいけない項目も増えていくため、メンテしている本人以外は使いこなすのが難しいという問題も出てきます。

copilot-instructions.mdをコードジェネレーターとみなす場合

copilot-instructions.mdは自然言語で書くことができるため、可読性の面では問題ありません。 条件分岐もAI側にある程度お任せすることができます。

コードジェネレーターとしてCopilotを活用する場合、この3つの要素が活用しやすいと考えます。

  • 現在のフォルダ、ファイル名
    • 生成するコードの役割を推測させるのに役に立つ
  • copilot-instructions.md
    • コード規約やノウハウ集に近いものになる
  • 生成時にユーザーが入力する文章
    • 細かいオーダーがある場合に使用する。なるべくこの要素の比率を少なくする

出力するファイル、使用する機能に応じてセクショニングする

copilot-instructions.mdはMarkdownの形式で記載することができます。 全ての内容を並列に記載するのではなく、セクションごとに分けて記載することで、Copilotがどの部分に対してどのようなコードを生成すべきかを判断しやすくなります。

  • fetch関連のコードのルール
  • コンポーネントのテストコードのルール
  • モックデータの生成関数のルール

といった具合に、こういうコードを書くときはここを参考にしてくださいと例示します。

プロジェクトのフォルダ構成から関数名、ファイル名を推測できるようにする

フォルダのやファイル、関数の命名規則を統一し、その内容をcopilot-instructions.mdに記載しておくと、Copilotが精度の高い関数名やimportを考えてくれるようになります。

例えばEntryHorseというドメインがある場合

## フォルダ構成

あるドメインに所属するコードは`src/features/{{ ドメイン名(ケバブケース )}}`の下に格納します。
Mockデータの生成関数は`src/feature/entry-horse/mock.ts`に格納されており、`generateMock`というprefixで始まるMockデータ生成関数が存在しています。
コンポーネントは`src/feature/entry-horse/components`に格納されています。
コンポーネントは基本的に`{{ ドメイン名 }}{{ Role }}`の形式で命名されており、Propsは`{{ ドメイン名 }}{{ Role }}Props`という形式で命名されています。

のような情報をcopilot-instructions.mdに記載した上でsrc/feature/entry-horse/components/list.tsx内で「コードを書いてください」と指示すると 以下のようなコードが出力されます。

import type React from "react";

type EntryHorseListProps = {};

const EntryHorseList: React.FC<EntryHorseListProps> = (props) => {
  return <div>{/* Render your component here */}</div>;
};

export default EntryHorseList;

上手く生成されたときの例を記載する

プロンプトエンジニアリングなどでもある手法ですが、具体的な例をMarkdownに提示してあげると、より精度の高いコードを生成します。

細かくフィードバックをして更新する

生成されたファイルが期待通りではない場合、随時copilot-instructions.mdに追記していくことで、Copilotが生成するコードの精度を向上させることができます。

ファイルやフォルダ自体の作成には、既存のコードジェネレーターを利用する

GitHub Copilotはファイルやフォルダ自体を生成することはできません。 決められたフォルダ構成やファイル名でファイルを生成するためには、既存のコードジェネレーターを使うことが有効です。 併用することで、コードの初期開発のスピードが上がります。

生成の命令時に指示を追加して使う

この点が既存のコードジェネレーターには特にないメリットだと感じています。 copilot-instructions.mdに書ききれない、少し例外的な関数が必要だとしても、生成時に注意点としてその旨を追記しておくことで柔軟な対応が可能です。

例えばこのコンポーネントはforwardRefを使う必要があるという場合、「forwardRefを使ってください」という指示にするだけで、対応が可能になります。

まとめ

サジェストの出力に未対応である点が解決されれば、この機能の価値は相当に高まるように思えます。 またcopilot-instructions.mdを育てていくことは、人間の開発者にとっても副次的な効果があるのではないかと予想しています。 自然言語で書くことができ、積極的に更新していくモチベーションにもなるため、開発者間にとっても有益なノウハウ集や、実効性のあるコード規約集として活用できる可能性があるのではないかと感じています。

AWSとGoogle Cloudのコスト最適化の道 〜データドリブンな取り組みの紹介〜

CTO の小笠原(@yamitzky)です。今日は、CTO として推進している「サーバー費削減プロジェクト」の取り組みについてご紹介します。

本稿では「リザーブドインスタンスを購入する」や「入札型のインスタンスに移行する」といった一般的な削減テクニックについては扱いません。プロジェクトとしてどう分析、進行し、成果を出しているか、という話を中心に、取り組みをまとめています。

背景

JX通信社では、Amazon Web Services(以下、AWS) や Google Cloud などのクラウドサービスを活用しています。これらのクラウドサービスは通常、ドルで費用が決まっており、日本円で支払います。そのため、為替の影響を受けてしまいます。

ちょうど最近は円高の恩恵を受けていますが、つい3年前の2021年は1ドル103円だったところ、2024年のピーク時には160円まで進行しています。つまり原価が1.5倍近く上がってしまっていることになります。

Google Finance のドル円チャート

サーバーコスト削減のための開発は、直接的な売上増や、顧客へ提供する価値の向上には繋がらないものです。ついつい後回しになりがちではありましたが、為替などの背景もあり、2022年ごろから大規模なサーバーコスト削減に断続的に取り組む形になりました。

サーバーコスト削減施策に取り組むための、3つの基本

サーバーコスト削減を成功に導くコツとしては、3つあると考えています。これらを順を追ってご紹介します。

  1. プロジェクトとして立ち上げる
  2. 「行動につなげやすいコミュニケーション」を意識する
  3. データドリブンなアプローチを取る

1. プロジェクトとして立ち上げる

企業においてなんらかの取り組みを成功させるには「プロジェクト」を立ち上げるのが良いと思います。プロジェクトの要素として、次の点を抑えると良いです。

  • 「プロジェクト名」を決める
  • 「時期」「ゴール」「リソース」を決める
  • どう実現するか、施策の優先順位の方針を決める
  • モニタリングと振り返りを行う

2022年に取り組んだ最初の「サーバー費削減プロジェクト」は 「2023年3月までに費用を30%削る」というゴールを設定 し、と銘打ったりしました。

プロジェクト用のNotionページ

そして 「月に10万円以上の削減効果があるものを優先する」「削除するだけで終わるものを優先する」 といった方針を設けたり、「一ヶ月以上工数がかかるものはやらない」「放っておけば減りそうなものはやらない」といった優先度付けをしたり、「機能削減だけでできるものを優先し、施策の責任者と調整する」「数日でできるものはプロダクトバックログに入れてもらう」といった交渉などを行いました。

2. 「行動につなげやすいコミュニケーション」を意識する

例えば 「Amazon S3 のデータが高いのでなんとかしてください!」 と伝えても、「どれくらいの重要度なのか」「どれくらいの大変さなのか」「なぜそれをやらないといけないのか」などはわからず、納得感のあるコミュニケーションにならないですし、行動につなげることもできません。

そこで、次のようなコミュニケーションを意識・徹底しています。

  • コストや削減幅を伝えるときは、単位を「一ヶ月あたり◯万円」に揃える *1
  • 「何にかかっているコストなのか」「どんな施策や売上に紐づいたコストなのか」などを調べ、伝える
  • 削減の難易度についての考えを述べる

例えば、冒頭の例を言い換えると、 「開発版のS3バケットに、月20万円もかかっています。開発版なので、3ヶ月以上古いデータを自動削除する設定をするだけで、月2万円程度までコストが下げられるはずです」 などという具合に伝えます。そうすると「開発版だから確かにもったいないな」「開発版だから古いの消すのは合理的だな」「消すだけなら簡単だな」と、関係者が納得感を持って理解し、行動しやすくなります

詳細はほぼお見せできないのですが、施策や削減手段ごとにかかっているコストなどをまとめて管理しています

3. データドリブンなアプローチ

サーバーコスト削減の成果を出すために、 定量的に分析してなるべく効果の高いものを見つけ、その結果を日次でモニタリング するようにしています。分析とモニタリングにわけてご説明します。

分析フェーズ

まず、AWS や Google Cloud のすべてのコストを、BigQuery に転送しています。そのデータを、Connected Sheets を使って Google Sheets に連携しています。 さらに、一個一個の細目に対して、「何の機能にかかっているコストなのか」を目視でアノテーションしています *2

Connected Sheet の例。一個一個の細目に対して、プロダクトの機能や、コストの目的をアノテーションしています。

そして Google Sheets 上に集約したものを、以下のような分析軸でピボットテーブルにかけます。

  • クラウドのアカウント・プロジェクトIDごと
  • クラウドの製品ごと (Lambda, DynamoDB, Cloud Run, etc...)
  • 利用タイプ・SKUごと (Lambda の「GB-Second-ARM」、Cloud Run の「CPU Allocation Time」といった単位。このとき、リージョンは分かれないようにまとめます)
  • 機能・施策ごと (自社プロダクトにおける「◯◯機能」や、「セキュリティ監査のため」などの用途)
  • 事業ごと

このように分析を進めると、 削減幅の大きい対象や、ムダに感じられる費用、費用対効果の見合わないプロダクトの機能、社内システムetc...などが浮かび上がってきます。「ムダな費用かもしれなくて確認が必要だが、削減幅の大きくないもの」の優先度を落とすこともできます。

また、AWS の Cost Explorer を使った分析をされている方も多いと思います。私も、簡易的な用途としてはよく利用しますが、クラウド横断での分析ができないこと、分析の集計軸(ピボットテーブルできる区分)が限定的で意味のある集計になりづらいこと、定期的なモニタリングがしづらいことなどから、BigQuery や Google Sheets をベースにした分析をおすすめしたいです。

モニタリングフェーズ

BigQuery に集約したクラウドのコストを、Redash で定時集計し、毎朝 Slack に投稿するようにしています。Redash への投稿は主に私の作った bot を使っています*3

毎日だと変動が大きく削減できたかわかりづらいこともあるので、週次集計や、月次予測での過去の◯月比、といった比較も定期的に行っています。また、AWS、Google Cloud 以外については、稟議申請のタイミングでの費用チェック等も地道にやっています。

全体像。構築時期がかなり古いため冗長ですが、S3→GCS→BigQueryの転送などはもっとシンプルにできます。AWS のコストデータはクラスメソッドの仕組みで保存されています。

まとめ

今回は「サーバー費のコスト削減」というテーマについて、具体的なテクニックではなく、データドリブンな取り組みやプロジェクト管理にフォーカスを当ててご紹介しました。削減テクニックとしては、AWSの公式ブログ やその他の技術ブログも参考にしましたが、削減幅が大きくないためにJX通信社ではやっていない施策も多々あります。定量的に分析をしてから取り組む、というのが大事ではないでしょうか。

また、サーバー費削減が進んでいるのは、ひとえに社内関係者のご協力があってのことです。この場を借りて、御礼を申し上げます。

*1:「月◯万円」という単位で目標や売上、あるいは自分の給料を見ることが多いので、このような単位にしています

*2:アノテーションしていない費用は、全体の1%程度です。金額が大きいものは厳密に確認しつつ、ある程度ルールベースでのアノテーションもして、えいやで付与しています

*3:hakobera さんの素晴らしいアイデアをフォークしていますが、コードはほぼ書き換わっています