本ページは i-PRO株式会社 の有志メンバーにより記載されたものです。
本ページの情報は ライセンス に記載の条件で提供されます。
本ページでは、i-PRO カメラと TyTorch および VGG16 という AI ネットワークモデルを使用して「画像分類」と呼ばれる AI 処理を行ってみます。
VGG16 は、2014年の ILSVR で2位になった畳み込みニューラルネットワークです。オックスフォード大学の VGG(Visual Geometry Group) チームが作成した16層から構成されるネットワークモデルであるため VGG16 と呼ばれています。
作成したプログラムの動作例はこちらです。学習済みモデルを使用して1000種類の画像分類を行っています。画面下部に分類結果を表示しています。
こちらに記載の内容は i-PRO のカメラ "i-PRO mini (WV-S7130)"、"モジュールカメラ(AIスターターキット)" を使って動作確認しています。未確認ですが他の i-PRO カメラを使用する場合も恐らくそのまま利用可能です。
"i-PRO mini" 紹介:
"モジュールカメラ" 紹介:
カメラ初期設定についてはカメラ毎の取扱説明書をご確認ください。
カメラのIPアドレスを確認・設定できる下記ツールを事前に入手しておくと便利です。
Python を事前にインストール済みであることを前提に記載します。
私の評価環境は以下の通りです。
| 言語 : | Python, | 3.10.4 |
| OS : | Windows 11 home, | 21H2 |
| Windows 10 Pro, | 21H1 | |
こちら「PyTorch をインストールする - 1-1. CPU」の記事を参考にインストールを行います。
Windows 環境で多くの人が試せるようにしたいので、本ページでは「Compute Platform」を「CPU」としてインストールしている前提で説明を記載します。NVIDIA の GPU など特定のハードウェアを必要としません。
作業フォルダを準備して、下記プログラムを保存します。
[プログラムソース "preparation.py"]
import os
import urllib.request
# Create the folder "data" when the folder "data" does not exist.
# フォルダ「data」が存在しない場合はフォルダ data を作成します。
data_dir = "./data/"
if not os.path.exists(data_dir):
os.mkdir(data_dir)
# Download class_index for ImageNet.
# It is prepared for Keras. This is the JSON file used by the following open source.
# ImageNetのclass_indexをダウンロードします。
# Kerasで用意されているものです。下記オープンソースで使用している JSON ファイルです。
# https://github.com/fchollet/deep-learning-models/blob/master/imagenet_utils.py
url = "https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json"
save_path = os.path.join(data_dir, "imagenet_class_index.json")
if not os.path.exists(save_path):
urllib.request.urlretrieve(url, save_path)
そしてこのプログラムを実行すると、作業フォルダ中に data フォルダを作成してその中に "imagenet_class_index.json"
というファイルを作成して保存してくれます。
このファイルは画像分類で使用する 1000種類 のラベル情報です。
matplotlib を使用するので、下記コマンドによりインストールします。
pip3 install matplotlib
学習済みの VGG モデルを使用し、静止画(JPEGファイル)の画像分類を行ってみます。
ここでは入力画像として、 https://pixabay.com から取得した4つの画像(てんとう虫、ゴールデンレトリバー(犬)、車、デイジー(花))を使用させていただき実験してみます。
いずれも 商用利用無料、帰属表示必要なし、の画像です。
各画像の取得元は、下記ソースコード中に記載の URL を参照ください。
下記プログラムを実行する際は、ご自身で各画像をダウンロードして事前に data フォルダに保存してください。

| 言語 : | Python, | 3.10.4 |
| OS : | Windows 11 home, | 21H2 |
| Windows 10 Pro, | 21H1 | |
部分ごとに説明して最後に全体ソースコードを示します。
1.
パッケージのインポートを最初に行います。
import numpy as np import json from PIL import Image import matplotlib.pyplot as plt import torch import torchvision from torchvision import models, transforms
2.
下記コードにより VGG16 の学習済みモデルをロードします。
初めて実行する際は、学習済みパラメータをインターネットからダウンロードするため、実行に時間がかかります。
# Create an instance of the VGG16 model
self.net = models.vgg16( pretrained = True )
self.net.eval() # Set to evaluation mode.
# Display network-model.
print(self.net)
self.net = models.vgg16( pretrained = True )
により取得した学習済みモデルは下記に保存されます。
'~' はログインしているユーザーのホームディレクトリを意味します。
Where are my downloaded models saved?
The locations are used in the order of
- Calling hub.set_dir(<PATH_TO_HUB_DIR>)
- $TORCH_HOME/hub, if environment variable TORCH_HOME is set.
- $XDG_CACHE_HOME/torch/hub, if environment variable XDG_CACHE_HOME is set.
- ~/.cache/torch/hub
私の場合は下記に vgg16-397923af.pth を保存していました。約540MBのファイルサイズでした。
"~.cache\torch\hub\checkpoints\vgg16-397923af.pth"
3.
入力画像の前処理クラスを作成します。
class BaseTransform():
'''
Pre-process the input image. Image resizing, color standardization, etc.
入力画像の前処理を行う。画像のリサイズ、色の標準化など。
'''
def __init__(self, resize, mean, std):
self.base_transform = transforms.Compose([
transforms.Resize((resize, resize)), # Resize both long and short sides to the size of resize.
#transforms.Resize(resize), # Resize the short edge length to the size of resize while preserving the aspect
#transforms.CenterCrop(resize), # Crop the center of the image with resize × resize.
transforms.ToTensor(), # Convert to Torch-Tensor.
transforms.Normalize(mean, std) # color standardization
])
def __call__(self, img):
'''
Perform pre-process the input image.
'''
return self.base_transform(img)
こんな感じで BaseTransform クラスのインスタンスを生成するときに画像サイズ、規格化の情報を与えています。
# Create an instance of preprocessing.
resize = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
self.transform = BaseTransform(resize, mean, std)
4.
出力結果からラベルを判定するクラスを作成します。
最もスコアの高いラベル(predicted_label_name)とそのスコア(score)を返します。
出力を softmax で処理することで全体(1000種別)のスコアを足すと 1.0 になるようにしています。
class ILSVRCPredictor():
'''
Get the label name with the highest score from the calculation result.
演算結果から最もスコアの高いラベル名を取得する。
'''
def __init__(self, class_index):
'''
Constructor
Args:
class_index [i] class index.
'''
self.class_index = class_index
def predict_max(self, out):
'''
Get the label name with the highest score from the calculation result.
最もスコアの高いラベル名を取得する。
'''
data = out.detach().numpy()
probabilities = torch.nn.functional.softmax(out, dim=1)[0]
maxid = np.argmax(data)
score = probabilities[maxid].item()
predicted_label_name = self.class_index[str(maxid)][1]
return predicted_label_name, score
5.
画像分類を行う本体のクラスを作成します。
class ImagenetClassificationVgg():
'''
Image classification.
画像分類を行う。
'''
def __init__(self, class_index_file):
'''
Constructor
Args:
class_index_file: [i] class index file path.
'''
# PyTorch version.
print("PyTorch Version: ", torch.__version__)
print("Torchvision Version: ", torchvision.__version__)
# Load a trained VGG-16 model.
# The first time you run it, it will take a long time to run because it will download the trained parameters.
# 学習済みの VGG-16 モデルをロードする。
# 初めて実行する際は、学習済みパラメータをダウンロードするため、実行に時間がかかります。
# Create an instance of the VGG16 model
self.net = models.vgg16( pretrained = True )
self.net.eval() # Set to evaluation mode.
# Display network-model.
print(self.net)
# Create an instance of preprocessing.
resize = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
self.transform = BaseTransform(resize, mean, std)
# Load ILSVRC label information and create an ILSVRCPredictor instance.
self.ILSVRC_class_index = json.load( open(class_index_file, 'r') )
self.predictor = ILSVRCPredictor(self.ILSVRC_class_index)
def imagenet_classification_vgg(self, img, debug=False):
'''
Perform image classification.
Args:
img: [i] An image for image classification. PIL.Image format.
debug: [i] if set to True, display debug images.
Returns:
results: Results of image classification.
'''
if debug==True:
# View original image.
plt.imshow(img)
plt.show()
# Preprocessing.
img_transformed = self.transform(img) # torch.Size([3, 224, 224])
if debug==True:
# Display the image after preprocessing.
img_transformed_2 = img_transformed.numpy().transpose((1, 2, 0))
img_transformed_2 = np.clip(img_transformed_2, 0, 1)
plt.imshow(img_transformed_2)
plt.show()
# Added batch size dimension.
inputs = img_transformed.unsqueeze_(0) # torch.Size([1, 3, 224, 224])
# ネットワークモデルへ画像を入力し、出力をラベルに変換
out = self.net(inputs) # torch.Size([1, 1000])
result = self.predictor.predict_max(out)
return result
6.
main 部分です。
ImagenetClassificatinVgg クラスのインスタンスを作成して分類を行う画像を入力するだけです。
if __name__ == "__main__":
'''
main
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Open image file.
# https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1280x855
img = Image.open('./data/ladybug-g7744c038e_1280.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/goldenretriever-%E7%8A%AC-3724972/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、640x426
img = Image.open('./data/goldenretriever-3724972_640.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/%e8%bb%8a-%e7%94%b2%e8%99%ab-%e3%83%95%e3%82%a9%e3%83%ab%e3%82%af%e3%82%b9%e3%83%af%e3%83%bc%e3%82%b2%e3%83%b3-1283947/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1920x1280
img = Image.open('./data/car-g955f2640f_1920.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/%e3%83%9e%e3%83%bc%e3%82%ac%e3%83%ac%e3%83%83%e3%83%88-%e3%83%87%e3%82%a4%e3%82%b8%e3%83%bc-%e8%8a%b1-729510/
# https://pixabay.com/ja/service/license/
# 商用利用無料、帰属表示必要なし、1920x1249
img = Image.open('./data/marguerite-gfad1f1cea_1920.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
以下に全ソースコードを記載します。
[全ソースコード "classsification_vgg.py"]
'''
[Abstract]
Image classification.
画像分類を行います。
[Details]
This program classifies still images (JPEG files).
静止画(JPEG files)の画像分類を行います。
[Library install]
torch, torchvision : see https://pytorch.org/get-started/locally/
matplotlib : pip install matplotlib
numpy : pip install numpy
PIL : pip install pillow
json : json is a built-in module in Python, you don’t need to install it with pip.
[Note]
Download the JPEG file yourself and save it in the data folder. See main function.
JPEGファイルはご自身でダウンロードして data フォルダに保存を行ってください。main 関数内の記載をご確認ください。
'''
import numpy as np
import json
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torchvision
from torchvision import models, transforms
class BaseTransform():
'''
Pre-process the input image. Image resizing, color standardization, etc.
入力画像の前処理を行う。画像のリサイズ、色の標準化など。
'''
def __init__(self, resize, mean, std):
self.base_transform = transforms.Compose([
transforms.Resize((resize, resize)), # Resize both long and short sides to the size of resize.
transforms.ToTensor(), # Convert to Torch-Tensor.
transforms.Normalize(mean, std) # color standardization
])
def __call__(self, img):
'''
Perform pre-process the input image.
'''
return self.base_transform(img)
class ILSVRCPredictor():
'''
Get the label name with the highest score from the calculation result.
演算結果から最もスコアの高いラベル名を取得する。
'''
def __init__(self, class_index):
'''
Constructor
Args:
class_index [i] class index.
'''
self.class_index = class_index
def predict_max(self, out):
'''
Get the label name with the highest score from the calculation result.
最もスコアの高いラベル名を取得する。
'''
data = out.detach().numpy()
probabilities = torch.nn.functional.softmax(out, dim=1)[0]
maxid = np.argmax(data)
score = probabilities[maxid].item()
predicted_label_name = self.class_index[str(maxid)][1]
return predicted_label_name, score
class ImagenetClassificationVgg():
'''
Image classification.
画像分類を行う。
'''
def __init__(self, class_index_file):
'''
Constructor
Args:
class_index_file: [i] class index file path.
'''
# PyTorch version.
print("PyTorch Version: ", torch.__version__)
print("Torchvision Version: ", torchvision.__version__)
# Load a trained VGG-16 model.
# The first time you run it, it will take a long time to run because it will download the trained parameters.
# 学習済みの VGG-16 モデルをロードする。
# 初めて実行する際は、学習済みパラメータをダウンロードするため、実行に時間がかかります。
# Create an instance of the VGG16 model
self.net = models.vgg16( pretrained = True )
self.net.eval() # Set to evaluation mode.
# Display network-model.
print(self.net)
# Create an instance of preprocessing.
resize = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
self.transform = BaseTransform(resize, mean, std)
# Load ILSVRC label information and create an ILSVRCPredictor instance.
self.ILSVRC_class_index = json.load( open(class_index_file, 'r') )
self.predictor = ILSVRCPredictor(self.ILSVRC_class_index)
def do_classification(self, img, debug=False):
'''
Perform image classification.
Args:
img: [i] An image for image classification. PIL.Image format.
debug: [i] if set to True, display debug images.
Returns:
results: Results of image classification.
'''
if debug==True:
# View original image.
plt.imshow(img)
plt.show()
# Preprocessing.
img_transformed = self.transform(img) # torch.Size([3, 224, 224])
if debug==True:
# Display the image after preprocessing.
img_transformed_2 = img_transformed.numpy().transpose((1, 2, 0))
img_transformed_2 = np.clip(img_transformed_2, 0, 1)
plt.imshow(img_transformed_2)
plt.show()
# Added batch size dimension.
inputs = img_transformed.unsqueeze_(0) # torch.Size([1, 3, 224, 224])
# Input images to the network model and convert the output to labels.
out = self.net(inputs) # torch.Size([1, 1000])
result = self.predictor.predict_max(out)
return result
if __name__ == "__main__":
'''
main
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Open image file.
# https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
# https://pixabay.com/ja/service/license/
# Free for commercial use, no attribution required, 1280x855
# 商用利用無料、帰属表示必要なし、1280x855
img = Image.open('./data/ladybug-g7744c038e_1280.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/goldenretriever-%E7%8A%AC-3724972/
# https://pixabay.com/ja/service/license/
# Free for commercial use, no attribution required, 640x426
# 商用利用無料、帰属表示必要なし、640x426
img = Image.open('./data/goldenretriever-3724972_640.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/%e8%bb%8a-%e7%94%b2%e8%99%ab-%e3%83%95%e3%82%a9%e3%83%ab%e3%82%af%e3%82%b9%e3%83%af%e3%83%bc%e3%82%b2%e3%83%b3-1283947/
# https://pixabay.com/ja/service/license/
# Free for commercial use, no attribution required, 1920x1280
# 商用利用無料、帰属表示必要なし、1920x1280
img = Image.open('./data/car-g955f2640f_1920.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
# https://pixabay.com/ja/photos/%e3%83%9e%e3%83%bc%e3%82%ac%e3%83%ac%e3%83%83%e3%83%88-%e3%83%87%e3%82%a4%e3%82%b8%e3%83%bc-%e8%8a%b1-729510/
# https://pixabay.com/ja/service/license/
# Free for commercial use, no attribution required, 1920x1249
# 商用利用無料、帰属表示必要なし、1920x1249
img = Image.open('./data/marguerite-gfad1f1cea_1920.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
実行結果です。コンソールへ出力された内容です。
PyTorch Version: 1.11.0+cpu
Torchvision Version: 0.12.0+cpu
VGG(
(features): Sequential(
(0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(1): ReLU(inplace=True)
(2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(3): ReLU(inplace=True)
(4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(6): ReLU(inplace=True)
(7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(8): ReLU(inplace=True)
(9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(11): ReLU(inplace=True)
(12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(13): ReLU(inplace=True)
(14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(15): ReLU(inplace=True)
(16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(17): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(18): ReLU(inplace=True)
(19): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(20): ReLU(inplace=True)
(21): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(22): ReLU(inplace=True)
(23): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(24): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(25): ReLU(inplace=True)
(26): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(27): ReLU(inplace=True)
(28): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(29): ReLU(inplace=True)
(30): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
)
(avgpool): AdaptiveAvgPool2d(output_size=(7, 7))
(classifier): Sequential(
(0): Linear(in_features=25088, out_features=4096, bias=True)
(1): ReLU(inplace=True)
(2): Dropout(p=0.5, inplace=False)
(3): Linear(in_features=4096, out_features=4096, bias=True)
(4): ReLU(inplace=True)
(5): Dropout(p=0.5, inplace=False)
(6): Linear(in_features=4096, out_features=1000, bias=True)
)
)
Result: ('ladybug', 0.9478541612625122)
Result: ('golden_retriever', 0.9413034915924072)
Result: ('sports_car', 0.3690492510795593)
Result: ('daisy', 0.9962427616119385)
正しく画像分類できていそうです。
ImagenetClassificationVgg クラスのインスタンスを作成したらあとは画像を渡すだけ、という感じで実行できます。
学習済みのモデルを使用して推論するだけならさほど難しく無いと思います。興味あればチャレンジしてみて下さい。
i-PRO カメラと接続して取得した映像に対して「画像分類」をリアルタイムに実施してみたいと思います。本章では認識結果をコンソールへ出力することとします。認識結果を映像上へテキストで重畳表示することも簡単にできますので、興味あればチャレンジしてみてください。
「RTSP で画像を取得する」中で OpenCV による顔検知を作成しましたので、このプログラムを元に OpenCV の処理部分を上記で作成した物体検知へ変更してみたいと思います。
| 言語 : | Python, | 3.10.4 |
| OS : | Windows 11 home, | 21H2 |
| Windows 10 Pro, | 21H1 | |
ポイント
上記で作成した "imagenet_classsification_vgg.py" をそのままライブラリとして活用します。
具体的には、下記のように from imagenet_classsification_vgg import ImagenetClassificationVgg と記載することで作成済みプログラムをそのまま使用できます。
'classification_vgg.py' ファイルを同じフォルダ内に保存する必要があります。
from PIL import Image
from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'.
if __name__ == "__main__":
'''
main
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Open image file.
# https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
# https://pixabay.com/ja/service/license/
# Free for commercial use, no attribution required, 1280x855
# 商用利用無料、帰属表示必要なし、1280x855
img = Image.open('./data/ladybug-g7744c038e_1280.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
こんな方針でプログラム作成を行っていきます。
「RTSP で画像を取得する」中で作成したプログラム "connect_with_rtsp_3_1.py" を元に改造することで、RTSP接続して受信したカメラ映像をリアルタイムに画像分類するプログラムを作成してみます。VGG の処理負荷はとても高そうなのでちょっと心配ですが、必要に応じてカメラ側の設定でフレームレートや解像度を下げて使用する、という方針で進めます。
[プログラムソース "classification_with_camera_1.py"]
'''
[Abstract]
Image classification.
画像分類を行います。
[Details]
This program connects to an i-PRO camera and classifies live images.
このプログラムは、i-PRO カメラと接続してライブ映像に対して画像分類を行います。
[Library install]
torch, torchvision : see https://pytorch.org/get-started/locally/
cv2 : pip install opencv-python
matplotlib : pip install matplotlib
numpy : pip install numpy
PIL : pip install pillow
json : Built-in module in Python, you don’t need to install it with pip.
'''
import cv2
from PIL import Image
from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'.
user_id = "user-id" # Change to match your camera setting
user_pw = "password" # Change to match your camera setting
host = "192.168.0.10" # Change to match your camera setting
winname = "VIDEO" # Window title
# Exception definition.
BackendError = type('BackendError', (Exception,), {})
def IsWindowVisible(winname):
'''
Check if the target window exists.
Args:
winname : Window title.
Returns:
True : Exist.
False : Not exist.
Raise:
BackendError :
'''
try:
ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
if ret == -1:
raise BackendError('Use Qt as backend to check whether window is visible or not.')
return bool(ret)
except cv2.error:
return False
def CV2Pil(image):
'''
Convert from OpenCV to PIL.Image
Params:
image: OpenCV image.
Returns:
PIL.Image format image.
'''
new_image = image.copy()
if new_image.ndim == 2: # Grayscale
pass
elif new_image.shape[2] == 3: # Color
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB)
elif new_image.shape[2] == 4: # Color with alpha channel
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA)
new_image = Image.fromarray(new_image)
return new_image
'''
[Abstract]
main 関数
'''
if __name__ == '__main__':
# Create an instance of class ImagenetClassificationVgg.
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Create an instance of class cv2.VideoCapture
cap = cv2.VideoCapture(f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1")
#
windowInitialized = False
while True:
try:
ret, frame = cap.read()
if ret == True:
# Image classification
pilImage = CV2Pil(frame)
result, score = imagenetClassifigationVgg.do_classification(pilImage)
if score > 0.15:
print(result, score)
else:
print('None')
# Resize to a display size that fits on your PC screen.
width = 640
height = 480
h, w = frame.shape[:2]
aspect = w / h
if width / height >= aspect:
nh = height
nw = round(nh * aspect)
else:
nw = width
nh = round(nw / aspect)
frame2 = cv2.resize(frame, (nw, nh))
# Display image.
cv2.imshow(winname, frame2)
if windowInitialized==False:
# Specify the display position only at the beginning.
cv2.moveWindow(winname, 100, 100)
windowInitialized = True
# Press the "q" key to finish.
k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow ()
if k == ord("q"):
break
# Exit if there is no specified window.
if not IsWindowVisible(winname):
break
except KeyboardInterrupt:
# Press '[ctrl] + [c]' on the console to exit the program.
print("KeyboardInterrupt")
break
print("Finish main()")
cap.release()
cv2.destroyAllWindows()
結果:
予想通り VGG の処理がとても重たく、PC性能にもよると思いますが、今回実施しているCPU処理ではフレームレートを 1,3,5fps 程度に設定する必要がありそうです。
加えてフレームレートを下げても一定の遅延を発生しました。恐らく10フレーム程度のバッファリングが行われており、例えば 1fps に設定すると10秒程度の遅延を常に生じます。
用途にもよりますが、ちょっと残念。映像表示だけでも10fps以上の通常表示を維持しつつ、画像分類の処理をできるだけ実施するというような改善を考えてみたいところです。
そこで、画像分類の処理を別タスクに分離することで、映像受信と映像デコード処理を止めずにできるだけ画像分類をやってみる、という感じにプログラムを修正してみます。
multiprocessing, queue というライブラリを使用して実現してみます。
こちらが新規に作成したプログラムです。"connect_with_rtsp_3_2.py" を元に作成しています。
[プログラムソース "classification_with_camera_2.py"]
'''
[Abstract]
Image classification.
画像分類を行います。
[Details]
This program connects to an i-PRO camera and classifies live images with multitasking.
このプログラムは、i-PRO カメラと接続してライブ映像に対してマルチタスク処理で画像分類を行います。
[Library install]
torch, torchvision : see https://pytorch.org/get-started/locally/
cv2 : pip install opencv-python
matplotlib : pip install matplotlib
numpy : pip install numpy
PIL : pip install pillow
json, multiprocessing, queue :
Built-in module in Python, you don’t need to install it with pip.
'''
import cv2
import multiprocessing as mp
from queue import Empty
from PIL import Image
from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'.
user_id = "user-id" # Change to match your camera setting
user_pw = "password" # Change to match your camera setting
host = "192.168.0.10" # Change to match your camera setting
winname = "VIDEO" # Window title
# Exception definition.
BackendError = type('BackendError', (Exception,), {})
def IsWindowVisible(winname):
'''
Check if the target window exists.
Args:
winname : Window title.
Returns:
True : Exist.
False : Not exist.
Raise:
BackendError :
'''
try:
ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
if ret == -1:
raise BackendError('Use Qt as backend to check whether window is visible or not.')
return bool(ret)
except cv2.error:
return False
def CV2Pil(image):
'''
Convert from OpenCV to PIL.Image
Params:
image: OpenCV image.
Returns:
PIL.Image format image.
'''
new_image = image.copy()
if new_image.ndim == 2: # Grayscale
pass
elif new_image.shape[2] == 3: # Color
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB)
elif new_image.shape[2] == 4: # Color with alpha channel
new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA)
new_image = Image.fromarray(new_image)
return new_image
def ImageClassificationProcess(q):
'''
Image classification process.
Args:
q1 : [i] Queue that stores images for face detection.
q2 : [o] Queue that stores face detection results.
Returns:
None
'''
# Create an instance of class ImagenetClassificationVgg.
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
while True:
try:
image = q.get(True, 10)
# Terminate process
if type(image) == int:
if image == -1:
break
# Image classification
pilImage = CV2Pil(image)
result, score = imagenetClassifigationVgg.do_classification(pilImage)
if score > 0.15:
print(result, score)
else:
print('None')
except Empty: # timeout of q1.get()
print("Timeout happen.(3)")
print("Finish ImageClassificationProcess()")
'''
[Abstract]
__main__
'''
if __name__ == '__main__':
# Create an instance of class cv2.VideoCapture
cap = cv2.VideoCapture(f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1")
#
windowInitialized = False
# Create and start image classification process.
q = mp.Queue()
p = mp.Process(target=ImageClassificationProcess, args=(q,))
p.start()
while True:
try:
ret, frame = cap.read()
if ret == True:
#
if (q.qsize() <= 1):
q.put(frame)
# Resize to a display size that fits on your PC screen.
width = 640
height = 480
h, w = frame.shape[:2]
aspect = w / h
if width / height >= aspect:
nh = height
nw = round(nh * aspect)
else:
nw = width
nh = round(nw / aspect)
frame2 = cv2.resize(frame, (nw, nh))
# Display image.
cv2.imshow(winname, frame2)
if windowInitialized==False:
# Specify the display position only at the beginning.
cv2.moveWindow(winname, 100, 100)
windowInitialized = True
# Press the "q" key to finish.
k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow ()
if k == ord("q"):
break
# Exit if there is no specified window.
if not IsWindowVisible(winname):
break
except KeyboardInterrupt:
# Press '[ctrl] + [c]' on the console to exit the program.
print("KeyboardInterrupt")
break
# Terminate process p
q.put(-1)
# Waiting for process p to finish
p.join()
print("Finish main()")
cap.release()
cv2.destroyAllWindows()
[動画] i-PRO カメラと接続して、リアルタイムに画像分類
CPU 版の PyTorch での動作ですが、十分に高速な処理をしてくれているように私は感じました。
GPU
版を使うともっと素敵なパフォーマンスで動作することと思いますが、この画像分類についてはこれでもいろいろと活用できるのではないでしょうか。
前述の画像分類を GUI(tkinter)版で作成してみます。
「 RTSP で画像を取得する : 7-3. メニュー・ボタンを追加して GUI アプリらしくしてみる」で作成した GUI プログラムをベースに改造してみます。
ポイント
| 言語 : | Python, | 3.10.4 |
| OS : | Windows 11 home, | 21H2 |
| Windows 10 Pro, | 21H1 | |
[プログラムソース "classification_gui.py"]
'''
[Abstract]
Image classification.
画像分類を行います。
[Details]
Create a GUI application using tkinter.
tkinter を使って GUI アプリケーションを作成します。
[Library install]
cv2 : pip install opencv-python
PIL : pip install pillow
'''
import cv2
import time
import tkinter as tk
from tkinter import messagebox
from PIL import Image, ImageTk, ImageOps
import multiprocessing as mp
from queue import Empty
from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'.
user_id = "user-id" # Change to match your camera setting
user_pw = "password" # Change to match your camera setting
host = "192.168.0.10" # Change to match your camera setting
winname = "VIDEO" # Window title
url = f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1"
class Application(tk.Frame):
def __init__(self, master = None):
super().__init__(master)
self.pack()
# Window settings.
self.master.title("Display i-PRO camera with tkinter") # Window title
self.master.geometry("800x600+100+100") # Window size, position
# Event registration for window termination.
self.master.protocol("WM_DELETE_WINDOW", self.on_closing_window)
# Create menu.
menubar = tk.Menu(self.master)
self.master.configure(menu=menubar)
filemenu = tk.Menu(menubar)
menubar.add_cascade(label='File', menu=filemenu)
filemenu.add_command(label='Quit', command = self.on_closing_window)
# Create button_frame
self.button_frame = tk.Frame(self.master, padx=10, pady=10, relief=tk.RAISED, bd=2)
self.button_frame.pack(side = tk.BOTTOM, fill=tk.X)
# Label
self.label_frame1 = tk.Frame(self.button_frame, width=10)
self.label_frame1.pack(side=tk.LEFT)
self.label_frame2 = tk.Frame(self.button_frame, width=40)
self.label_frame2.pack(side=tk.LEFT)
self.class_text = tk.StringVar()
self.score_text = tk.StringVar()
self.class_text.set('')
self.score_text.set('')
self.label1 = tk.Label(self.label_frame1, text='Class: ').pack(side=tk.TOP)
self.label2 = tk.Label(self.label_frame2, textvariable=self.class_text, relief=tk.RIDGE, width=20).pack(side=tk.TOP)
self.label3 = tk.Label(self.label_frame1, text='Score: ').pack(side=tk.TOP)
self.label4 = tk.Label(self.label_frame2, textvariable=self.score_text, relief=tk.RIDGE, width=20).pack(side=tk.TOP)
# Create quit_button
self.quit_button = tk.Button(self.button_frame, text='Quit', width=10, command = self.on_closing_window)
self.quit_button.pack(side=tk.RIGHT)
# Create canvas.
self.canvas = tk.Canvas(self.master)
# Add mouse click event to canvas.
self.canvas.bind('<Button-1>', self.canvas_click)
# Place canvas.
self.canvas.pack(expand = True, fill = tk.BOTH)
# Create queue and value for image receive process.
self.imageQueue = mp.Queue()
self.request = mp.Value('i', 0) # -1 : Exit ReceiveImageProcess.
# 0 : Normal.
# 1 : Connect camera.
# 2 : Release camera.
# Create queue for classification process.
self.imageQueue2 = mp.Queue()
self.resultQueue = mp.Queue()
# Create processes.
self.imageReceiveProcess = mp.Process(target=ReceiveImageProcess, args=(self.imageQueue, self.imageQueue2, self.request))
self.classificationProcess = mp.Process(target=ImageClassificationProcess, args=(self.imageQueue2, self.resultQueue))
self.imageReceiveProcess.start()
self.classificationProcess.start()
# Raise a video display event (disp_image) after 500m
self.disp_id = self.after(500, self.disp_image)
def on_closing_window(self):
''' Window closing event. '''
if messagebox.askokcancel("QUIT", "Do you want to quit?"):
# Request terminate process.
self.request.value = -1
self.imageQueue2.put(-1)
# Waiting for process p to finish
time.sleep(1)
# Flash queue.
# The program cannot complete processes unless the queue is emptied.
for i in range(self.imageQueue.qsize()):
image = self.imageQueue.get()
for i in range(self.imageQueue2.qsize()):
image = self.imageQueue2.get()
for i in range(self.resultQueue.qsize()):
result = self.resultQueue.get()
# Wait for process to be terminated.
self.imageReceiveProcess.join()
self.classificationProcess.join()
self.master.destroy()
print("Finish Application.")
def canvas_click(self, event):
''' Event handling with mouse clicks on canvas '''
if self.disp_id is None:
# Connect camera.
self.request.value = 1
# Display image.
self.disp_image()
else:
# Release camera.
self.request.value = 2
# Cancel scheduling
self.after_cancel(self.disp_id)
self.disp_id = None
def disp_image(self):
''' Display image on Canvas '''
# If there is data in the imageQueue, the program receives the data and displays the video.
num = self.imageQueue.qsize()
if num > 0:
if (num > 5):
num -= 1
for i in range(num):
cv_image = self.imageQueue.get()
# (2) Convert image from ndarray to PIL.Image.
pil_image = Image.fromarray(cv_image)
# Get canvas size.
canvas_width = self.canvas.winfo_width()
canvas_height = self.canvas.winfo_height()
# Resize the image to the size of the canvas without changing the aspect ratio.
# アスペクトを維持したまま画像を Canvas と同じサイズにリサイズ
pil_image = ImageOps.pad(pil_image, (canvas_width, canvas_height))
# (3) Convert image from PIL.Image to PhotoImage
# PIL.Image から PhotoImage へ変換する
self.photo_image = ImageTk.PhotoImage(image=pil_image)
# Display image on the canvas.
self.canvas.create_image(
canvas_width / 2, # Image display position (center of the canvas)
canvas_height / 2,
image=self.photo_image # image data
)
else:
pass
# Update GUI Label.
result_num = self.resultQueue.qsize()
if result_num > 0:
for i in range(result_num):
label, score = self.resultQueue.get()
self.class_text.set(label)
score = '{:.4f}'.format(score)
self.score_text.set(score)
# Raise a video display event (disp_image) after 1ms.
self.disp_id = self.after(1, self.disp_image)
def ReceiveImageProcess(imageQueue, imageQueue2, request):
'''
Receive Image Process.
Args:
imageQueue [o] Image data for display.
imageQueue2 [o] Image data for image classification.
request [i] Shared memory for receiving requests from the main process.
-1: Terminate process.
0: Nothing.
1: Connect camera.
2: Release camera connection.
Returns:
None
Raises
None
'''
# Connect camera.
cap = cv2.VideoCapture(url)
while True:
if cap != None:
# Get frame.
ret, frame = cap.read()
if ret == True:
# (1) Convert image from BGR to RGB.
cv_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# for display.
if imageQueue.qsize() < 10:
imageQueue.put(cv_image)
# for image classification.
if imageQueue2.qsize() <= 1:
imageQueue2.put(cv_image)
else:
print("cap.read() return False.")
# The timeout period seems to be 30 seconds.
# And there seems to be no API to change the timeout value.
time.sleep(1)
# Reconnect
cap.release()
cap = cv2.VideoCapture(url)
else:
time.sleep(0.1)
# Check process termination request.
if request.value == -1:
# Terminate process.
cap.release()
request.value = 0
break
# Check connect request.
if request.value == 1:
cap = cv2.VideoCapture(url)
request.value = 0
# Check release request.
if request.value == 2:
cap.release()
cap = None
request.value = 0
print("Terminate ReceiveImageProcess().")
def ImageClassificationProcess(imageQueue, resultQueue):
'''
Image classification process.
Args:
imageQueue : [i] Image for image classification.
resultQueue : [o] Save classification result labels and scores.
Returns:
None
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
while True:
try:
image = imageQueue.get(True, 10)
# If type(image) is 'int' and image is -1, then this process is terminated.
if type(image) == int:
if image == -1:
break
# Image classification
pilImage = Image.fromarray(image) # convert from OpenCV image to PIL.Image
result, score = imagenetClassifigationVgg.do_classification(pilImage)
if score > 0.15:
print(result, score)
resultQueue.put((result, score))
else:
print('None')
resultQueue.put(('None', 0.0))
except Empty: # timeout of imageQueue.get()
print("Timeout happen.(3)")
print("Finish ImageClassificationProcess()")
if __name__ == "__main__":
root = tk.Tk()
app = Application(master = root)
app.mainloop()
[動画] i-PRO カメラと接続してリアルタイムに画像分類、GUI版
NOTE
デスクトップPCなどの高性能PCではこのまま動作しましたが、標準的なノートPCでは処理負荷が高くて上記のままでは軽快に動作しませんでした。
私の場合は接続先を stream_2 へ変更して映像の解像度を下げることで気持ちよく動作するようになりました。
実際に動作させるPCの性能などに応じてカメラ映像の解像度やフレーム数などを適当に調整してみてください。
そこそこ良い感じに作れたのでは、と思っています。
プログラムが約300ステップまで大きくなってきましたので、機能拡張はこの辺で一旦おしまいにしたいと思います。
本ページで紹介のソースコードは、下記 github より取得できます。
下記 github のソースコードと本ページの内容は差異がある場合があります。
i-pro-corp/python-examples: Examples for i-PRO cameras. (github.com)
本ページの情報は、特記無い限り下記ライセンスで提供されます。
| 2023/10/20 | - | IP簡単設定ソフトウェア、IP Setting Software リンク先を更新, | 木下英俊 |
| 2022/7/20 | - | 微修正, | 木下英俊 |
| 2022/6/22 | - | 新規作成, | 木下英俊 |
i-PRO - Programming Items トップページ