[Amazon Bedrock] Amazon Titan Multimodal Embeddings G1モデル を使用して、「きのこの山」と「たけのこの里」の分類モデルを作成してみました

2024.05.12

1. はじめに

CX事業本部製造ビジネステクノロジー部の平内(SIN)です。

Amazon Bedrockで利用可能なAmazon Titan Multimodal Embeddings G1モデル は、 テキスト、イメージ、または、その組み合わせによるマルチモーダル埋め込みモデルです。

今回は、これを利用して、画像の分類モデルを作成してみました。

2.検証

(1) データ

使用したデータは、下記のブログで作成した「きのこの山」と「たけのこの里」の画像です。回転台に乗せて撮影し、Segment Anything Modelで切り取って背景を白にしたものです。

ファイルは、下記のようにimagesの階層下に配置しています。

% tree ./images
./images
├── KINOKO
│   ├── 000000000_w.png
│   ├── 000000003_w.png
│   ├── 000000006_w.png
・・・略・・・
│   ├── 000001113_w.png
│   ├── 000001116_w.png
│   └── 000001119_w.png
└── TAKENOKO
    ├── 000000000_w.png
    ├── 000000003_w.png
    ├── 000000006_w.png
・・・略・・・
    ├── 000001044_w.png
    ├── 000001047_w.png
    └── 000001050_w.png

なお、各ファイルは、290 * 224 程度(一定ではない)のpng画像です。

% file 000000000_w.png
000000000_w.png: PNG image data, 290 x 224, 8-bit/color RGB, non-interlaced

% ls -la 000000000_w.png
-rw-r--r--@ 1 hirauchi.shinichi  staff  61725  5 12 01:45 000000000_w.png

※ Amazon Titan Multimodal Embeddings G1モデルで入力可能な画像サイズは、5MBとなっているので、大きな画像を扱う場合は、注意が必要です。

(2) ベクトルデータ生成

下記のコードで、画像をBase64文字列化してベクトルデータを取得しています。

import os
from typing import List
import boto3
import base64
import json
import glob


# ソースコードの位置を基準にして、フルパスを取得する
def get_full_path(filename: str) -> str:
    return os.path.join(os.path.dirname(__file__), filename)


# ベクトルデータのファイルからの読み込み
def read_data(filename: str) -> List[List[float]]:
    with open(get_full_path(filename)) as f:
        return [[float(s) for s in line.rstrip("\n").split(",")] for line in f]


# ベクトルデータのファイルへの保存
def save_data(array: List[List[float]], filename: str) -> None:
    with open(get_full_path(filename), "w") as f:
        f.writelines(",".join(map(str, x)) + "\n" for x in array)


# 画像からベクトルデータを取得
def get_embedding(bedrock, image_path: str, dimensions: int) -> List[float]:
    with open(image_path, "rb") as image:
        body = image.read()

    response = bedrock.invoke_model(
        body=json.dumps(
            {
                "inputImage": base64.b64encode(body).decode("utf8"),
                "embeddingConfig": {"outputEmbeddingLength": dimensions},
            }
        ),
        modelId="amazon.titan-embed-image-v1",
        accept="application/json",
        contentType="application/json",
    )

    response_body = json.loads(response.get("body").read())
    return response_body.get("embedding")


def main() -> None:

    bedrock = boto3.client(service_name="bedrock-runtime", region_name="us-east-1")
    dimensions = 1024

    for kind in ["KINOKO", "TAKENOKO"]:
        image_path = f"images/{kind}"
        files = glob.glob(f"{get_full_path(image_path)}/*.png")
        embedding_list = [get_embedding(bedrock, file, dimensions) for file in files]
        save_data(embedding_list, f"{kind}_1024.csv")


if __name__ == "__main__":
    main()

プログラムを実行すると、「KINOKO_1024.csv」及び「TAKENOKO_1024.csv」が出力されます。

% ls -la *.csv
-rw-r--r--  1 hirauchi.shinichi  staff  4768568  5 12 01:54 KINOKO_1024.csv
-rw-r--r--  1 hirauchi.shinichi  staff  4469167  5 12 01:57 TAKENOKO_1024.csv

出力ファイルには、1画像分の1行でベクトルの配列が格納されています。各画像は、約350枚ほどあるので、約350行となっています。

% wc -l *_1024.csv
     374 KINOKO_1024.csv
     351 TAKENOKO_1024.csv
     725 total

(3) 分布

このデータで、うまく分類できそうかどうかを確認するため、生成した多次元のベクトルデータを2次元まで削減して、表示してみました。

import os
from typing import List
import matplotlib.pyplot as plt
import numpy as np
from sklearn import random_projection

# スパースランダム投影で次元削減したデータを表示
def show(embedding_data_list: List[List[List[float]]]) -> None:
    colors = ["teal", "tomato"]
    for i, embedding_data in enumerate(embedding_data_list):
        matrix = np.array(embedding_data, dtype="float")
        X_Sprojected = random_projection.SparseRandomProjection(
            n_components=2
        ).fit_transform(matrix)
        plt.scatter(X_Sprojected[:, 0], X_Sprojected[:, 1], c=colors[i], alpha=0.5)
    plt.show()


def main() -> None:
    embedding_data_files = ["KINOKO_1024.csv", "TAKENOKO_1024.csv"]
    embedding_data_list = [read_data(file) for file in embedding_data_files]
    show(embedding_data_list)


if __name__ == "__main__":
    main()

「きのこ」が、赤で、「たけのこ」が、緑です。これを見た限りでは、充分に期待できそうです。

(4) 比較

「コサイン類似性」で検索が可能かどうかを検証するために、各ベクトルデータの間の類似性を出力してみました。

import os
from typing import List
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity


# ソースコードの位置を基準にして、フルパスを取得する
def get_full_path(filename: str) -> str:
    return os.path.join(os.path.dirname(__file__), filename)


# ベクトルデータのファイルからの読み込み
def read_data(filename: str) -> List[List[float]]:
    with open(get_full_path(filename)) as f:
        return [[float(s) for s in line.rstrip("\n").split(",")] for line in f]

# ベクトルデータ間のコサイン類似性を計算する
def compare(
    test_datas: List[List[float]], target_datas: List[List[float]], threshold: float
) -> None:

    counter = 0
    for test_data in test_datas:
        # 各データに対するコサイン類似度の計算
        similarities = [
            cosine_similarity([test_data], [target_data])[0][0]
            for target_data in target_datas
        ]
                # コサイン類似度の平均
        avg = np.mean(similarities)
        if avg > threshold:
            print(f"{avg:.2f}")
            counter += 1
        else:
            print(f"↓ {avg:.2f}")
    print(f"{threshold}以上のデータ: {counter / len(target_datas) * 100:.1f}%")


def main() -> None:
    kinoko_datas = read_data("KINOKO_1024.csv")
    takenoko_datas = read_data("TAKENOKO_1024.csv")

    threshold = 0.85
    print("\n各「きのこのデータ」を全部の「きのこデータ」と比較する")
    compare(kinoko_datas, kinoko_datas, threshold)

    print("\n各「きのこのデータ」を全部の「たけのこデータ」と比較する")
    compare(kinoko_datas, takenoko_datas, threshold)


if __name__ == "__main__":
    main()

「きのこ画像」同士の場合、0.85 以上のものが、79.14%であったのに対し、「たけのこ画像」との間では、0%でした。この辺の状況をうまく利用すれば、簡単に分類モデルが作成できそうです。

各「きのこのデータ」を全部の「きのこデータ」と比較する
0.90
0.88
0.86
↓ 0.84
0.87
・・・略・・・
0.85
0.85
0.88
0.90

0.85以上のデータ: 79.1%

各「きのこのデータ」を全部の「たけのこデータ」と比較する
↓ 0.83
↓ 0.74
↓ 0.76
↓ 0.82
・・・略・・・
↓ 0.82
↓ 0.78
↓ 0.79
↓ 0.82

0.85以上のデータ: 0.0%

作業中、ベクトルは、256次元も試してみたのですが、コサイン類似性で、ちょっと精度が出なかたので、ある程度の次元が必要なのかなと感じました。

3.実装

(1) Pinecone

ベクトルデータベースには、Pineconeを使用しました。

インデックスは、下記の設定で、Pineconeのコンソールから作成しています。

name: my-pinecone-index
metric: cosine コサイン類似度
Dimensions: 1024 次元
Capacity mode: SERVERLESS

(2) データ挿入

データベースに入れるデータは、100件のみとしました。もしかすると、もっと少なくても大丈夫かもしれません。 メタデータは、特に設定していませんが、IDを「kinoko_連番」「takenoko_連番」のような形式とし、検索結果が、どちらにヒットしたかが分かるようにしました。

% head -n 100 KINOKO_1024.csv > KINOKO_1024.data
% head -n 100 TAKENOKO_1024.csv > TAKENOKO_1024.data

データ挿入のコードは、以下です。予め、PineconeのAPI KEYを環境変数にセットして利用しています。

% export PINECONE_API_KEY=xxxxxxxxx-xxxxxxxx-xxxxxxxx
import os
from typing import List
from pinecone import Pinecone


# ソースコードの位置を基準にして、フルパスを取得する
def get_full_path(filename: str) -> str:
    return os.path.join(os.path.dirname(__file__), filename)


# ベクトルデータのファイルからの読み込み
def read_data(filename: str) -> List[List[float]]:
    with open(get_full_path(filename)) as f:
        return [[float(s) for s in line.rstrip("\n").split(",")] for line in f]


def upsert_data(index, data_list, label):
    for i, data in enumerate(data_list):
        index.upsert(vectors=[(f"{label}_{i}", data)])


def main() -> None:

    PINEKCON_API_KEY = os.environ.get("PINECONE_API_KEY")
    PINECONE_INDEX = "my-pinecone-index"

    pinecone = Pinecone(api_key=PINEKCON_API_KEY)
    index = pinecone.Index(PINECONE_INDEX)

    kinoko_datas = read_data("KINOKO_1024.data")
    takenoko_datas = read_data("TAKENOKO_1024.data")

    upsert_data(index, kinoko_datas, "kinoko")
    upsert_data(index, takenoko_datas, "takenoko")


if __name__ == "__main__":
    main()

データが格納されている状況です。

(3) Query

テスト用に用意した画像は、以下の10枚です。回転台で撮影した動画から、改めて切り出しました。

上記の画像を順に読み込み、ベクトルを生成し、PineconeでQueryをかけています。

import os
from typing import List
import base64
import json
import boto3
import glob
from pinecone import Pinecone


# ソースコードの位置を基準にして、フルパスを取得する
def get_full_path(filename: str) -> str:
    return os.path.join(os.path.dirname(__file__), filename)


# 画像からベクトルデータを取得
def get_embedding(bedrock, image_path: str, dimensions: int) -> List[float]:
    with open(image_path, "rb") as image:
        body = image.read()

    response = bedrock.invoke_model(
        body=json.dumps(
            {
                "inputImage": base64.b64encode(body).decode("utf8"),
                "embeddingConfig": {"outputEmbeddingLength": dimensions},
            }
        ),
        modelId="amazon.titan-embed-image-v1",
        accept="application/json",
        contentType="application/json",
    )

    response_body = json.loads(response.get("body").read())
    return response_body.get("embedding")

# ベクトルデータの取得
def query_index(index, vector):
    result = index.query(
        vector=vector,
        top_k=1,
        include_values=False,
    )
    print(f"id:{result.matches[0].id}  score:{result.matches[0].score}")


def main() -> None:

    PINEKCON_API_KEY = os.environ.get("PINECONE_API_KEY")
    PINECONE_INDEX = "my-pinecone-index"

    pinecone = Pinecone(api_key=PINEKCON_API_KEY)
    index = pinecone.Index(PINECONE_INDEX)

    bedrock = boto3.client(service_name="bedrock-runtime", region_name="us-east-1")
    dimensions = 1024

    # images/TESTに置かれているテスト画像を、順に推論する
    image_path = f"images/TEST"
    files = glob.glob(f"{get_full_path(image_path)}/*.png")
    files.sort()
    for file in files:
        print(f"\nfilename:{os.path.basename(file)}")
        vector = get_embedding(bedrock, file, dimensions)
        query_index(index, vector)

if __name__ == "__main__":
    main()

結果は、以下の通りでした。正解率は100%で、精度は充分だと思いました。

% python3 query.py

filename:kinoko_001.png
id:kinoko_15  score:0.775042295

filename:kinoko_002.png
id:kinoko_34  score:0.735480487

filename:kinoko_003.png
id:kinoko_92  score:0.744099379

filename:kinoko_004.png
id:kinoko_83  score:0.745024681

filename:kinoko_005.png
id:kinoko_29  score:0.793084085

filename:takenoko_001.png
id:takenoko_97  score:0.745306611

filename:takenoko_002.png
id:takenoko_83  score:0.735507786

filename:takenoko_003.png
id:takenoko_67  score:0.747619569

filename:takenoko_004.png
id:takenoko_7  score:0.792165279

filename:takenoko_005.png
id:takenoko_62  score:0.790254414

4. 最後に

今回は、Bedrockで利用可能なAmazon Titan Multimodal Embeddings G1モデルを使用して、画像の分類モデルを作成してみました。

思った以上に、簡単に作成できて驚いています。

分類モデルは、コンピュータービジョンの中でも、比較的、少量のデータでも作成可能ですが、今回のような方法だと、要件によっては、もっと効率的に作成できるかも知れないと感じました。

5. 参考リンク


Pincone Docs - Upsert data
Pincone Docs - Query data
PineconeをPythonで使う方法
AWS Marketplace の Pinecone を Amazon Bedrock のナレッジベースとして利用する
生成AIで外観検査をやってみた
tSNEでMNISTを軽やかに鮮やかにマッピングしていく
sklearn.random_projection.SparseRandomProjection