1. はじめに
CX事業本部製造ビジネステクノロジー部の平内(SIN)です。
Amazon Bedrockで利用可能なAmazon Titan Multimodal Embeddings G1モデル は、 テキスト、イメージ、または、その組み合わせによるマルチモーダル埋め込みモデルです。
今回は、これを利用して、画像の分類モデルを作成してみました。
2.検証
(1) データ
使用したデータは、下記のブログで作成した「きのこの山」と「たけのこの里」の画像です。回転台に乗せて撮影し、Segment Anything Modelで切り取って背景を白にしたものです。
ファイルは、下記のようにimagesの階層下に配置しています。
% tree ./images
./images
├── KINOKO
│ ├── 000000000_w.png
│ ├── 000000003_w.png
│ ├── 000000006_w.png
・・・略・・・
│ ├── 000001113_w.png
│ ├── 000001116_w.png
│ └── 000001119_w.png
└── TAKENOKO
├── 000000000_w.png
├── 000000003_w.png
├── 000000006_w.png
・・・略・・・
├── 000001044_w.png
├── 000001047_w.png
└── 000001050_w.png
なお、各ファイルは、290 * 224 程度(一定ではない)のpng画像です。
% file 000000000_w.png
000000000_w.png: PNG image data, 290 x 224, 8-bit/color RGB, non-interlaced
% ls -la 000000000_w.png
-rw-r--r--@ 1 hirauchi.shinichi staff 61725 5 12 01:45 000000000_w.png
※ Amazon Titan Multimodal Embeddings G1モデルで入力可能な画像サイズは、5MBとなっているので、大きな画像を扱う場合は、注意が必要です。
(2) ベクトルデータ生成
下記のコードで、画像をBase64文字列化してベクトルデータを取得しています。
import os
from typing import List
import boto3
import base64
import json
import glob
# ソースコードの位置を基準にして、フルパスを取得する
def get_full_path(filename: str) -> str:
return os.path.join(os.path.dirname(__file__), filename)
# ベクトルデータのファイルからの読み込み
def read_data(filename: str) -> List[List[float]]:
with open(get_full_path(filename)) as f:
return [[float(s) for s in line.rstrip("\n").split(",")] for line in f]
# ベクトルデータのファイルへの保存
def save_data(array: List[List[float]], filename: str) -> None:
with open(get_full_path(filename), "w") as f:
f.writelines(",".join(map(str, x)) + "\n" for x in array)
# 画像からベクトルデータを取得
def get_embedding(bedrock, image_path: str, dimensions: int) -> List[float]:
with open(image_path, "rb") as image:
body = image.read()
response = bedrock.invoke_model(
body=json.dumps(
{
"inputImage": base64.b64encode(body).decode("utf8"),
"embeddingConfig": {"outputEmbeddingLength": dimensions},
}
),
modelId="amazon.titan-embed-image-v1",
accept="application/json",
contentType="application/json",
)
response_body = json.loads(response.get("body").read())
return response_body.get("embedding")
def main() -> None:
bedrock = boto3.client(service_name="bedrock-runtime", region_name="us-east-1")
dimensions = 1024
for kind in ["KINOKO", "TAKENOKO"]:
image_path = f"images/{kind}"
files = glob.glob(f"{get_full_path(image_path)}/*.png")
embedding_list = [get_embedding(bedrock, file, dimensions) for file in files]
save_data(embedding_list, f"{kind}_1024.csv")
if __name__ == "__main__":
main()
プログラムを実行すると、「KINOKO_1024.csv」及び「TAKENOKO_1024.csv」が出力されます。
% ls -la *.csv
-rw-r--r-- 1 hirauchi.shinichi staff 4768568 5 12 01:54 KINOKO_1024.csv
-rw-r--r-- 1 hirauchi.shinichi staff 4469167 5 12 01:57 TAKENOKO_1024.csv
出力ファイルには、1画像分の1行でベクトルの配列が格納されています。各画像は、約350枚ほどあるので、約350行となっています。
% wc -l *_1024.csv
374 KINOKO_1024.csv
351 TAKENOKO_1024.csv
725 total
(3) 分布
このデータで、うまく分類できそうかどうかを確認するため、生成した多次元のベクトルデータを2次元まで削減して、表示してみました。
import os
from typing import List
import matplotlib.pyplot as plt
import numpy as np
from sklearn import random_projection
# スパースランダム投影で次元削減したデータを表示
def show(embedding_data_list: List[List[List[float]]]) -> None:
colors = ["teal", "tomato"]
for i, embedding_data in enumerate(embedding_data_list):
matrix = np.array(embedding_data, dtype="float")
X_Sprojected = random_projection.SparseRandomProjection(
n_components=2
).fit_transform(matrix)
plt.scatter(X_Sprojected[:, 0], X_Sprojected[:, 1], c=colors[i], alpha=0.5)
plt.show()
def main() -> None:
embedding_data_files = ["KINOKO_1024.csv", "TAKENOKO_1024.csv"]
embedding_data_list = [read_data(file) for file in embedding_data_files]
show(embedding_data_list)
if __name__ == "__main__":
main()
「きのこ」が、赤で、「たけのこ」が、緑です。これを見た限りでは、充分に期待できそうです。
(4) 比較
「コサイン類似性」で検索が可能かどうかを検証するために、各ベクトルデータの間の類似性を出力してみました。
import os
from typing import List
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
# ソースコードの位置を基準にして、フルパスを取得する
def get_full_path(filename: str) -> str:
return os.path.join(os.path.dirname(__file__), filename)
# ベクトルデータのファイルからの読み込み
def read_data(filename: str) -> List[List[float]]:
with open(get_full_path(filename)) as f:
return [[float(s) for s in line.rstrip("\n").split(",")] for line in f]
# ベクトルデータ間のコサイン類似性を計算する
def compare(
test_datas: List[List[float]], target_datas: List[List[float]], threshold: float
) -> None:
counter = 0
for test_data in test_datas:
# 各データに対するコサイン類似度の計算
similarities = [
cosine_similarity([test_data], [target_data])[0][0]
for target_data in target_datas
]
# コサイン類似度の平均
avg = np.mean(similarities)
if avg > threshold:
print(f"{avg:.2f}")
counter += 1
else:
print(f"↓ {avg:.2f}")
print(f"{threshold}以上のデータ: {counter / len(target_datas) * 100:.1f}%")
def main() -> None:
kinoko_datas = read_data("KINOKO_1024.csv")
takenoko_datas = read_data("TAKENOKO_1024.csv")
threshold = 0.85
print("\n各「きのこのデータ」を全部の「きのこデータ」と比較する")
compare(kinoko_datas, kinoko_datas, threshold)
print("\n各「きのこのデータ」を全部の「たけのこデータ」と比較する")
compare(kinoko_datas, takenoko_datas, threshold)
if __name__ == "__main__":
main()
「きのこ画像」同士の場合、0.85 以上のものが、79.14%であったのに対し、「たけのこ画像」との間では、0%でした。この辺の状況をうまく利用すれば、簡単に分類モデルが作成できそうです。
各「きのこのデータ」を全部の「きのこデータ」と比較する
0.90
0.88
0.86
↓ 0.84
0.87
・・・略・・・
0.85
0.85
0.88
0.90
0.85以上のデータ: 79.1%
各「きのこのデータ」を全部の「たけのこデータ」と比較する
↓ 0.83
↓ 0.74
↓ 0.76
↓ 0.82
・・・略・・・
↓ 0.82
↓ 0.78
↓ 0.79
↓ 0.82
0.85以上のデータ: 0.0%
作業中、ベクトルは、256次元も試してみたのですが、コサイン類似性で、ちょっと精度が出なかたので、ある程度の次元が必要なのかなと感じました。
3.実装
(1) Pinecone
ベクトルデータベースには、Pineconeを使用しました。
インデックスは、下記の設定で、Pineconeのコンソールから作成しています。
name: my-pinecone-index
metric: cosine コサイン類似度
Dimensions: 1024 次元
Capacity mode: SERVERLESS
(2) データ挿入
データベースに入れるデータは、100件のみとしました。もしかすると、もっと少なくても大丈夫かもしれません。 メタデータは、特に設定していませんが、IDを「kinoko_連番」「takenoko_連番」のような形式とし、検索結果が、どちらにヒットしたかが分かるようにしました。
% head -n 100 KINOKO_1024.csv > KINOKO_1024.data
% head -n 100 TAKENOKO_1024.csv > TAKENOKO_1024.data
データ挿入のコードは、以下です。予め、PineconeのAPI KEYを環境変数にセットして利用しています。
% export PINECONE_API_KEY=xxxxxxxxx-xxxxxxxx-xxxxxxxx
import os
from typing import List
from pinecone import Pinecone
# ソースコードの位置を基準にして、フルパスを取得する
def get_full_path(filename: str) -> str:
return os.path.join(os.path.dirname(__file__), filename)
# ベクトルデータのファイルからの読み込み
def read_data(filename: str) -> List[List[float]]:
with open(get_full_path(filename)) as f:
return [[float(s) for s in line.rstrip("\n").split(",")] for line in f]
def upsert_data(index, data_list, label):
for i, data in enumerate(data_list):
index.upsert(vectors=[(f"{label}_{i}", data)])
def main() -> None:
PINEKCON_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_INDEX = "my-pinecone-index"
pinecone = Pinecone(api_key=PINEKCON_API_KEY)
index = pinecone.Index(PINECONE_INDEX)
kinoko_datas = read_data("KINOKO_1024.data")
takenoko_datas = read_data("TAKENOKO_1024.data")
upsert_data(index, kinoko_datas, "kinoko")
upsert_data(index, takenoko_datas, "takenoko")
if __name__ == "__main__":
main()
データが格納されている状況です。
(3) Query
テスト用に用意した画像は、以下の10枚です。回転台で撮影した動画から、改めて切り出しました。
上記の画像を順に読み込み、ベクトルを生成し、PineconeでQueryをかけています。
import os
from typing import List
import base64
import json
import boto3
import glob
from pinecone import Pinecone
# ソースコードの位置を基準にして、フルパスを取得する
def get_full_path(filename: str) -> str:
return os.path.join(os.path.dirname(__file__), filename)
# 画像からベクトルデータを取得
def get_embedding(bedrock, image_path: str, dimensions: int) -> List[float]:
with open(image_path, "rb") as image:
body = image.read()
response = bedrock.invoke_model(
body=json.dumps(
{
"inputImage": base64.b64encode(body).decode("utf8"),
"embeddingConfig": {"outputEmbeddingLength": dimensions},
}
),
modelId="amazon.titan-embed-image-v1",
accept="application/json",
contentType="application/json",
)
response_body = json.loads(response.get("body").read())
return response_body.get("embedding")
# ベクトルデータの取得
def query_index(index, vector):
result = index.query(
vector=vector,
top_k=1,
include_values=False,
)
print(f"id:{result.matches[0].id} score:{result.matches[0].score}")
def main() -> None:
PINEKCON_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_INDEX = "my-pinecone-index"
pinecone = Pinecone(api_key=PINEKCON_API_KEY)
index = pinecone.Index(PINECONE_INDEX)
bedrock = boto3.client(service_name="bedrock-runtime", region_name="us-east-1")
dimensions = 1024
# images/TESTに置かれているテスト画像を、順に推論する
image_path = f"images/TEST"
files = glob.glob(f"{get_full_path(image_path)}/*.png")
files.sort()
for file in files:
print(f"\nfilename:{os.path.basename(file)}")
vector = get_embedding(bedrock, file, dimensions)
query_index(index, vector)
if __name__ == "__main__":
main()
結果は、以下の通りでした。正解率は100%で、精度は充分だと思いました。
% python3 query.py
filename:kinoko_001.png
id:kinoko_15 score:0.775042295
filename:kinoko_002.png
id:kinoko_34 score:0.735480487
filename:kinoko_003.png
id:kinoko_92 score:0.744099379
filename:kinoko_004.png
id:kinoko_83 score:0.745024681
filename:kinoko_005.png
id:kinoko_29 score:0.793084085
filename:takenoko_001.png
id:takenoko_97 score:0.745306611
filename:takenoko_002.png
id:takenoko_83 score:0.735507786
filename:takenoko_003.png
id:takenoko_67 score:0.747619569
filename:takenoko_004.png
id:takenoko_7 score:0.792165279
filename:takenoko_005.png
id:takenoko_62 score:0.790254414
4. 最後に
今回は、Bedrockで利用可能なAmazon Titan Multimodal Embeddings G1モデルを使用して、画像の分類モデルを作成してみました。
思った以上に、簡単に作成できて驚いています。
分類モデルは、コンピュータービジョンの中でも、比較的、少量のデータでも作成可能ですが、今回のような方法だと、要件によっては、もっと効率的に作成できるかも知れないと感じました。
5. 参考リンク
Pincone Docs - Upsert data
Pincone Docs - Query data
PineconeをPythonで使う方法
AWS Marketplace の Pinecone を Amazon Bedrock のナレッジベースとして利用する
生成AIで外観検査をやってみた
tSNEでMNISTを軽やかに鮮やかにマッピングしていく
sklearn.random_projection.SparseRandomProjection