本ページは i-PRO株式会社 の有志メンバーにより記載されたものです。
本ページの情報は ライセンス に記載の条件で提供されます。
本ページでは、i-PRO カメラと TyTorch および VGG16 という AI ネットワークモデルを使用して「画像分類」と呼ばれる AI 処理を行ってみます。
VGG16 は、2014年の ILSVR で2位になった畳み込みニューラルネットワークです。オックスフォード大学の VGG(Visual Geometry Group) チームが作成した16層から構成されるネットワークモデルであるため VGG16 と呼ばれています。
作成したプログラムの動作例はこちらです。学習済みモデルを使用して1000種類の画像分類を行っています。画面下部に分類結果を表示しています。
こちらに記載の内容は i-PRO のカメラ "i-PRO mini (WV-S7130)"、"モジュールカメラ(AIスターターキット)" を使って動作確認しています。未確認ですが他の i-PRO カメラを使用する場合も恐らくそのまま利用可能です。
"i-PRO mini" 紹介:
"モジュールカメラ" 紹介:
カメラ初期設定についてはカメラ毎の取扱説明書をご確認ください。
カメラのIPアドレスを確認・設定できる下記ツールを事前に入手しておくと便利です。
Python を事前にインストール済みであることを前提に記載します。
私の評価環境は以下の通りです。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
こちら「PyTorch をインストールする - 1-1. CPU」の記事を参考にインストールを行います。
Windows 環境で多くの人が試せるようにしたいので、本ページでは「Compute Platform」を「CPU」としてインストールしている前提で説明を記載します。NVIDIA の GPU など特定のハードウェアを必要としません。
作業フォルダを準備して、下記プログラムを保存します。
[プログラムソース "preparation.py"]
import os import urllib.request # Create the folder "data" when the folder "data" does not exist. # フォルダ「data」が存在しない場合はフォルダ data を作成します。 data_dir = "./data/" if not os.path.exists(data_dir): os.mkdir(data_dir) # Download class_index for ImageNet. # It is prepared for Keras. This is the JSON file used by the following open source. # ImageNetのclass_indexをダウンロードします。 # Kerasで用意されているものです。下記オープンソースで使用している JSON ファイルです。 # https://github.com/fchollet/deep-learning-models/blob/master/imagenet_utils.py url = "https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json" save_path = os.path.join(data_dir, "imagenet_class_index.json") if not os.path.exists(save_path): urllib.request.urlretrieve(url, save_path)
そしてこのプログラムを実行すると、作業フォルダ中に data フォルダを作成してその中に "imagenet_class_index.json"
というファイルを作成して保存してくれます。
このファイルは画像分類で使用する 1000種類 のラベル情報です。
matplotlib を使用するので、下記コマンドによりインストールします。
pip3 install matplotlib
学習済みの VGG モデルを使用し、静止画(JPEGファイル)の画像分類を行ってみます。
ここでは入力画像として、 https://pixabay.com から取得した4つの画像(てんとう虫、ゴールデンレトリバー(犬)、車、デイジー(花))を使用させていただき実験してみます。
いずれも 商用利用無料、帰属表示必要なし、の画像です。
各画像の取得元は、下記ソースコード中に記載の URL を参照ください。
下記プログラムを実行する際は、ご自身で各画像をダウンロードして事前に data フォルダに保存してください。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
部分ごとに説明して最後に全体ソースコードを示します。
1.
パッケージのインポートを最初に行います。
import numpy as np import json from PIL import Image import matplotlib.pyplot as plt import torch import torchvision from torchvision import models, transforms
2.
下記コードにより VGG16 の学習済みモデルをロードします。
初めて実行する際は、学習済みパラメータをインターネットからダウンロードするため、実行に時間がかかります。
# Create an instance of the VGG16 model self.net = models.vgg16( pretrained = True ) self.net.eval() # Set to evaluation mode. # Display network-model. print(self.net)
self.net = models.vgg16( pretrained = True )
により取得した学習済みモデルは下記に保存されます。
'~' はログインしているユーザーのホームディレクトリを意味します。
Where are my downloaded models saved?
The locations are used in the order of
- Calling hub.set_dir(<PATH_TO_HUB_DIR>)
- $TORCH_HOME/hub, if environment variable TORCH_HOME is set.
- $XDG_CACHE_HOME/torch/hub, if environment variable XDG_CACHE_HOME is set.
- ~/.cache/torch/hub
私の場合は下記に vgg16-397923af.pth を保存していました。約540MBのファイルサイズでした。
"~.cache\torch\hub\checkpoints\vgg16-397923af.pth"
3.
入力画像の前処理クラスを作成します。
class BaseTransform(): ''' Pre-process the input image. Image resizing, color standardization, etc. 入力画像の前処理を行う。画像のリサイズ、色の標準化など。 ''' def __init__(self, resize, mean, std): self.base_transform = transforms.Compose([ transforms.Resize((resize, resize)), # Resize both long and short sides to the size of resize. #transforms.Resize(resize), # Resize the short edge length to the size of resize while preserving the aspect #transforms.CenterCrop(resize), # Crop the center of the image with resize × resize. transforms.ToTensor(), # Convert to Torch-Tensor. transforms.Normalize(mean, std) # color standardization ]) def __call__(self, img): ''' Perform pre-process the input image. ''' return self.base_transform(img)
こんな感じで BaseTransform クラスのインスタンスを生成するときに画像サイズ、規格化の情報を与えています。
# Create an instance of preprocessing. resize = 224 mean = (0.485, 0.456, 0.406) std = (0.229, 0.224, 0.225) self.transform = BaseTransform(resize, mean, std)
4.
出力結果からラベルを判定するクラスを作成します。
最もスコアの高いラベル(predicted_label_name)とそのスコア(score)を返します。
出力を softmax で処理することで全体(1000種別)のスコアを足すと 1.0 になるようにしています。
class ILSVRCPredictor(): ''' Get the label name with the highest score from the calculation result. 演算結果から最もスコアの高いラベル名を取得する。 ''' def __init__(self, class_index): ''' Constructor Args: class_index [i] class index. ''' self.class_index = class_index def predict_max(self, out): ''' Get the label name with the highest score from the calculation result. 最もスコアの高いラベル名を取得する。 ''' data = out.detach().numpy() probabilities = torch.nn.functional.softmax(out, dim=1)[0] maxid = np.argmax(data) score = probabilities[maxid].item() predicted_label_name = self.class_index[str(maxid)][1] return predicted_label_name, score
5.
画像分類を行う本体のクラスを作成します。
class ImagenetClassificationVgg(): ''' Image classification. 画像分類を行う。 ''' def __init__(self, class_index_file): ''' Constructor Args: class_index_file: [i] class index file path. ''' # PyTorch version. print("PyTorch Version: ", torch.__version__) print("Torchvision Version: ", torchvision.__version__) # Load a trained VGG-16 model. # The first time you run it, it will take a long time to run because it will download the trained parameters. # 学習済みの VGG-16 モデルをロードする。 # 初めて実行する際は、学習済みパラメータをダウンロードするため、実行に時間がかかります。 # Create an instance of the VGG16 model self.net = models.vgg16( pretrained = True ) self.net.eval() # Set to evaluation mode. # Display network-model. print(self.net) # Create an instance of preprocessing. resize = 224 mean = (0.485, 0.456, 0.406) std = (0.229, 0.224, 0.225) self.transform = BaseTransform(resize, mean, std) # Load ILSVRC label information and create an ILSVRCPredictor instance. self.ILSVRC_class_index = json.load( open(class_index_file, 'r') ) self.predictor = ILSVRCPredictor(self.ILSVRC_class_index) def imagenet_classification_vgg(self, img, debug=False): ''' Perform image classification. Args: img: [i] An image for image classification. PIL.Image format. debug: [i] if set to True, display debug images. Returns: results: Results of image classification. ''' if debug==True: # View original image. plt.imshow(img) plt.show() # Preprocessing. img_transformed = self.transform(img) # torch.Size([3, 224, 224]) if debug==True: # Display the image after preprocessing. img_transformed_2 = img_transformed.numpy().transpose((1, 2, 0)) img_transformed_2 = np.clip(img_transformed_2, 0, 1) plt.imshow(img_transformed_2) plt.show() # Added batch size dimension. inputs = img_transformed.unsqueeze_(0) # torch.Size([1, 3, 224, 224]) # ネットワークモデルへ画像を入力し、出力をラベルに変換 out = self.net(inputs) # torch.Size([1, 1000]) result = self.predictor.predict_max(out) return result
6.
main 部分です。
ImagenetClassificatinVgg クラスのインスタンスを作成して分類を行う画像を入力するだけです。
if __name__ == "__main__": ''' main ''' imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json') # Open image file. # https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/ # https://pixabay.com/ja/service/license/ # 商用利用無料、帰属表示必要なし、1280x855 img = Image.open('./data/ladybug-g7744c038e_1280.jpg') result = imagenetClassifigationVgg.do_classification(img) print("Result: ", result) # https://pixabay.com/ja/photos/goldenretriever-%E7%8A%AC-3724972/ # https://pixabay.com/ja/service/license/ # 商用利用無料、帰属表示必要なし、640x426 img = Image.open('./data/goldenretriever-3724972_640.jpg') result = imagenetClassifigationVgg.do_classification(img) print("Result: ", result) # https://pixabay.com/ja/photos/%e8%bb%8a-%e7%94%b2%e8%99%ab-%e3%83%95%e3%82%a9%e3%83%ab%e3%82%af%e3%82%b9%e3%83%af%e3%83%bc%e3%82%b2%e3%83%b3-1283947/ # https://pixabay.com/ja/service/license/ # 商用利用無料、帰属表示必要なし、1920x1280 img = Image.open('./data/car-g955f2640f_1920.jpg') result = imagenetClassifigationVgg.do_classification(img) print("Result: ", result) # https://pixabay.com/ja/photos/%e3%83%9e%e3%83%bc%e3%82%ac%e3%83%ac%e3%83%83%e3%83%88-%e3%83%87%e3%82%a4%e3%82%b8%e3%83%bc-%e8%8a%b1-729510/ # https://pixabay.com/ja/service/license/ # 商用利用無料、帰属表示必要なし、1920x1249 img = Image.open('./data/marguerite-gfad1f1cea_1920.jpg') result = imagenetClassifigationVgg.do_classification(img) print("Result: ", result)
以下に全ソースコードを記載します。
[全ソースコード "classsification_vgg.py"]
''' [Abstract] Image classification. 画像分類を行います。 [Details] This program classifies still images (JPEG files). 静止画(JPEG files)の画像分類を行います。 [Library install] torch, torchvision : see https://pytorch.org/get-started/locally/ matplotlib : pip install matplotlib numpy : pip install numpy PIL : pip install pillow json : json is a built-in module in Python, you don’t need to install it with pip. [Note] Download the JPEG file yourself and save it in the data folder. See main function. JPEGファイルはご自身でダウンロードして data フォルダに保存を行ってください。main 関数内の記載をご確認ください。 ''' import numpy as np import json from PIL import Image import matplotlib.pyplot as plt import torch import torchvision from torchvision import models, transforms class BaseTransform(): ''' Pre-process the input image. Image resizing, color standardization, etc. 入力画像の前処理を行う。画像のリサイズ、色の標準化など。 ''' def __init__(self, resize, mean, std): self.base_transform = transforms.Compose([ transforms.Resize((resize, resize)), # Resize both long and short sides to the size of resize. transforms.ToTensor(), # Convert to Torch-Tensor. transforms.Normalize(mean, std) # color standardization ]) def __call__(self, img): ''' Perform pre-process the input image. ''' return self.base_transform(img) class ILSVRCPredictor(): ''' Get the label name with the highest score from the calculation result. 演算結果から最もスコアの高いラベル名を取得する。 ''' def __init__(self, class_index): ''' Constructor Args: class_index [i] class index. ''' self.class_index = class_index def predict_max(self, out): ''' Get the label name with the highest score from the calculation result. 最もスコアの高いラベル名を取得する。 ''' data = out.detach().numpy() probabilities = torch.nn.functional.softmax(out, dim=1)[0] maxid = np.argmax(data) score = probabilities[maxid].item() predicted_label_name = self.class_index[str(maxid)][1] return predicted_label_name, score class ImagenetClassificationVgg(): ''' Image classification. 画像分類を行う。 ''' def __init__(self, class_index_file): ''' Constructor Args: class_index_file: [i] class index file path. ''' # PyTorch version. print("PyTorch Version: ", torch.__version__) print("Torchvision Version: ", torchvision.__version__) # Load a trained VGG-16 model. # The first time you run it, it will take a long time to run because it will download the trained parameters. # 学習済みの VGG-16 モデルをロードする。 # 初めて実行する際は、学習済みパラメータをダウンロードするため、実行に時間がかかります。 # Create an instance of the VGG16 model self.net = models.vgg16( pretrained = True ) self.net.eval() # Set to evaluation mode. # Display network-model. print(self.net) # Create an instance of preprocessing. resize = 224 mean = (0.485, 0.456, 0.406) std = (0.229, 0.224, 0.225) self.transform = BaseTransform(resize, mean, std) # Load ILSVRC label information and create an ILSVRCPredictor instance. self.ILSVRC_class_index = json.load( open(class_index_file, 'r') ) self.predictor = ILSVRCPredictor(self.ILSVRC_class_index) def do_classification(self, img, debug=False): ''' Perform image classification. Args: img: [i] An image for image classification. PIL.Image format. debug: [i] if set to True, display debug images. Returns: results: Results of image classification. ''' if debug==True: # View original image. plt.imshow(img) plt.show() # Preprocessing. img_transformed = self.transform(img) # torch.Size([3, 224, 224]) if debug==True: # Display the image after preprocessing. img_transformed_2 = img_transformed.numpy().transpose((1, 2, 0)) img_transformed_2 = np.clip(img_transformed_2, 0, 1) plt.imshow(img_transformed_2) plt.show() # Added batch size dimension. inputs = img_transformed.unsqueeze_(0) # torch.Size([1, 3, 224, 224]) # Input images to the network model and convert the output to labels. out = self.net(inputs) # torch.Size([1, 1000]) result = self.predictor.predict_max(out) return result if __name__ == "__main__": ''' main ''' imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json') # Open image file. # https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/ # https://pixabay.com/ja/service/license/ # Free for commercial use, no attribution required, 1280x855 # 商用利用無料、帰属表示必要なし、1280x855 img = Image.open('./data/ladybug-g7744c038e_1280.jpg') result = imagenetClassifigationVgg.do_classification(img) print("Result: ", result) # https://pixabay.com/ja/photos/goldenretriever-%E7%8A%AC-3724972/ # https://pixabay.com/ja/service/license/ # Free for commercial use, no attribution required, 640x426 # 商用利用無料、帰属表示必要なし、640x426 img = Image.open('./data/goldenretriever-3724972_640.jpg') result = imagenetClassifigationVgg.do_classification(img) print("Result: ", result) # https://pixabay.com/ja/photos/%e8%bb%8a-%e7%94%b2%e8%99%ab-%e3%83%95%e3%82%a9%e3%83%ab%e3%82%af%e3%82%b9%e3%83%af%e3%83%bc%e3%82%b2%e3%83%b3-1283947/ # https://pixabay.com/ja/service/license/ # Free for commercial use, no attribution required, 1920x1280 # 商用利用無料、帰属表示必要なし、1920x1280 img = Image.open('./data/car-g955f2640f_1920.jpg') result = imagenetClassifigationVgg.do_classification(img) print("Result: ", result) # https://pixabay.com/ja/photos/%e3%83%9e%e3%83%bc%e3%82%ac%e3%83%ac%e3%83%83%e3%83%88-%e3%83%87%e3%82%a4%e3%82%b8%e3%83%bc-%e8%8a%b1-729510/ # https://pixabay.com/ja/service/license/ # Free for commercial use, no attribution required, 1920x1249 # 商用利用無料、帰属表示必要なし、1920x1249 img = Image.open('./data/marguerite-gfad1f1cea_1920.jpg') result = imagenetClassifigationVgg.do_classification(img) print("Result: ", result)
実行結果です。コンソールへ出力された内容です。
PyTorch Version: 1.11.0+cpu Torchvision Version: 0.12.0+cpu VGG( (features): Sequential( (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (1): ReLU(inplace=True) (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (3): ReLU(inplace=True) (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (6): ReLU(inplace=True) (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (8): ReLU(inplace=True) (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (11): ReLU(inplace=True) (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (13): ReLU(inplace=True) (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (15): ReLU(inplace=True) (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) (17): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (18): ReLU(inplace=True) (19): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (20): ReLU(inplace=True) (21): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (22): ReLU(inplace=True) (23): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) (24): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (25): ReLU(inplace=True) (26): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (27): ReLU(inplace=True) (28): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (29): ReLU(inplace=True) (30): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) ) (avgpool): AdaptiveAvgPool2d(output_size=(7, 7)) (classifier): Sequential( (0): Linear(in_features=25088, out_features=4096, bias=True) (1): ReLU(inplace=True) (2): Dropout(p=0.5, inplace=False) (3): Linear(in_features=4096, out_features=4096, bias=True) (4): ReLU(inplace=True) (5): Dropout(p=0.5, inplace=False) (6): Linear(in_features=4096, out_features=1000, bias=True) ) ) Result: ('ladybug', 0.9478541612625122) Result: ('golden_retriever', 0.9413034915924072) Result: ('sports_car', 0.3690492510795593) Result: ('daisy', 0.9962427616119385)
正しく画像分類できていそうです。
ImagenetClassificationVgg クラスのインスタンスを作成したらあとは画像を渡すだけ、という感じで実行できます。
学習済みのモデルを使用して推論するだけならさほど難しく無いと思います。興味あればチャレンジしてみて下さい。
i-PRO カメラと接続して取得した映像に対して「画像分類」をリアルタイムに実施してみたいと思います。本章では認識結果をコンソールへ出力することとします。認識結果を映像上へテキストで重畳表示することも簡単にできますので、興味あればチャレンジしてみてください。
「RTSP で画像を取得する」中で OpenCV による顔検知を作成しましたので、このプログラムを元に OpenCV の処理部分を上記で作成した物体検知へ変更してみたいと思います。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
ポイント
上記で作成した "imagenet_classsification_vgg.py" をそのままライブラリとして活用します。
具体的には、下記のように from imagenet_classsification_vgg import ImagenetClassificationVgg と記載することで作成済みプログラムをそのまま使用できます。
'classification_vgg.py' ファイルを同じフォルダ内に保存する必要があります。
from PIL import Image
from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'.
if __name__ == "__main__":
'''
main
'''
imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')
# Open image file.
# https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
# https://pixabay.com/ja/service/license/
# Free for commercial use, no attribution required, 1280x855
# 商用利用無料、帰属表示必要なし、1280x855
img = Image.open('./data/ladybug-g7744c038e_1280.jpg')
result = imagenetClassifigationVgg.do_classification(img)
print("Result: ", result)
こんな方針でプログラム作成を行っていきます。
「RTSP で画像を取得する」中で作成したプログラム "connect_with_rtsp_3_1.py" を元に改造することで、RTSP接続して受信したカメラ映像をリアルタイムに画像分類するプログラムを作成してみます。VGG の処理負荷はとても高そうなのでちょっと心配ですが、必要に応じてカメラ側の設定でフレームレートや解像度を下げて使用する、という方針で進めます。
[プログラムソース "classification_with_camera_1.py"]
''' [Abstract] Image classification. 画像分類を行います。 [Details] This program connects to an i-PRO camera and classifies live images. このプログラムは、i-PRO カメラと接続してライブ映像に対して画像分類を行います。 [Library install] torch, torchvision : see https://pytorch.org/get-started/locally/ cv2 : pip install opencv-python matplotlib : pip install matplotlib numpy : pip install numpy PIL : pip install pillow json : Built-in module in Python, you don’t need to install it with pip. ''' import cv2 from PIL import Image from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'. user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title # Exception definition. BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' Check if the target window exists. Args: winname : Window title. Returns: True : Exist. False : Not exist. Raise: BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False def CV2Pil(image): ''' Convert from OpenCV to PIL.Image Params: image: OpenCV image. Returns: PIL.Image format image. ''' new_image = image.copy() if new_image.ndim == 2: # Grayscale pass elif new_image.shape[2] == 3: # Color new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB) elif new_image.shape[2] == 4: # Color with alpha channel new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA) new_image = Image.fromarray(new_image) return new_image ''' [Abstract] main 関数 ''' if __name__ == '__main__': # Create an instance of class ImagenetClassificationVgg. imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json') # Create an instance of class cv2.VideoCapture cap = cv2.VideoCapture(f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1") # windowInitialized = False while True: try: ret, frame = cap.read() if ret == True: # Image classification pilImage = CV2Pil(frame) result, score = imagenetClassifigationVgg.do_classification(pilImage) if score > 0.15: print(result, score) else: print('None') # Resize to a display size that fits on your PC screen. width = 640 height = 480 h, w = frame.shape[:2] aspect = w / h if width / height >= aspect: nh = height nw = round(nh * aspect) else: nw = width nh = round(nw / aspect) frame2 = cv2.resize(frame, (nw, nh)) # Display image. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify the display position only at the beginning. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break print("Finish main()") cap.release() cv2.destroyAllWindows()
結果:
予想通り VGG の処理がとても重たく、PC性能にもよると思いますが、今回実施しているCPU処理ではフレームレートを 1,3,5fps 程度に設定する必要がありそうです。
加えてフレームレートを下げても一定の遅延を発生しました。恐らく10フレーム程度のバッファリングが行われており、例えば 1fps に設定すると10秒程度の遅延を常に生じます。
用途にもよりますが、ちょっと残念。映像表示だけでも10fps以上の通常表示を維持しつつ、画像分類の処理をできるだけ実施するというような改善を考えてみたいところです。
そこで、画像分類の処理を別タスクに分離することで、映像受信と映像デコード処理を止めずにできるだけ画像分類をやってみる、という感じにプログラムを修正してみます。
multiprocessing, queue というライブラリを使用して実現してみます。
こちらが新規に作成したプログラムです。"connect_with_rtsp_3_2.py" を元に作成しています。
[プログラムソース "classification_with_camera_2.py"]
''' [Abstract] Image classification. 画像分類を行います。 [Details] This program connects to an i-PRO camera and classifies live images with multitasking. このプログラムは、i-PRO カメラと接続してライブ映像に対してマルチタスク処理で画像分類を行います。 [Library install] torch, torchvision : see https://pytorch.org/get-started/locally/ cv2 : pip install opencv-python matplotlib : pip install matplotlib numpy : pip install numpy PIL : pip install pillow json, multiprocessing, queue : Built-in module in Python, you don’t need to install it with pip. ''' import cv2 import multiprocessing as mp from queue import Empty from PIL import Image from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'. user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title # Exception definition. BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' Check if the target window exists. Args: winname : Window title. Returns: True : Exist. False : Not exist. Raise: BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False def CV2Pil(image): ''' Convert from OpenCV to PIL.Image Params: image: OpenCV image. Returns: PIL.Image format image. ''' new_image = image.copy() if new_image.ndim == 2: # Grayscale pass elif new_image.shape[2] == 3: # Color new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB) elif new_image.shape[2] == 4: # Color with alpha channel new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA) new_image = Image.fromarray(new_image) return new_image def ImageClassificationProcess(q): ''' Image classification process. Args: q1 : [i] Queue that stores images for face detection. q2 : [o] Queue that stores face detection results. Returns: None ''' # Create an instance of class ImagenetClassificationVgg. imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json') while True: try: image = q.get(True, 10) # Terminate process if type(image) == int: if image == -1: break # Image classification pilImage = CV2Pil(image) result, score = imagenetClassifigationVgg.do_classification(pilImage) if score > 0.15: print(result, score) else: print('None') except Empty: # timeout of q1.get() print("Timeout happen.(3)") print("Finish ImageClassificationProcess()") ''' [Abstract] __main__ ''' if __name__ == '__main__': # Create an instance of class cv2.VideoCapture cap = cv2.VideoCapture(f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1") # windowInitialized = False # Create and start image classification process. q = mp.Queue() p = mp.Process(target=ImageClassificationProcess, args=(q,)) p.start() while True: try: ret, frame = cap.read() if ret == True: # if (q.qsize() <= 1): q.put(frame) # Resize to a display size that fits on your PC screen. width = 640 height = 480 h, w = frame.shape[:2] aspect = w / h if width / height >= aspect: nh = height nw = round(nh * aspect) else: nw = width nh = round(nw / aspect) frame2 = cv2.resize(frame, (nw, nh)) # Display image. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify the display position only at the beginning. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break # Terminate process p q.put(-1) # Waiting for process p to finish p.join() print("Finish main()") cap.release() cv2.destroyAllWindows()
[動画] i-PRO カメラと接続して、リアルタイムに画像分類
CPU 版の PyTorch での動作ですが、十分に高速な処理をしてくれているように私は感じました。
GPU
版を使うともっと素敵なパフォーマンスで動作することと思いますが、この画像分類についてはこれでもいろいろと活用できるのではないでしょうか。
前述の画像分類を GUI(tkinter)版で作成してみます。
「 RTSP で画像を取得する : 7-3. メニュー・ボタンを追加して GUI アプリらしくしてみる」で作成した GUI プログラムをベースに改造してみます。
ポイント
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
[プログラムソース "classification_gui.py"]
''' [Abstract] Image classification. 画像分類を行います。 [Details] Create a GUI application using tkinter. tkinter を使って GUI アプリケーションを作成します。 [Library install] cv2 : pip install opencv-python PIL : pip install pillow ''' import cv2 import time import tkinter as tk from tkinter import messagebox from PIL import Image, ImageTk, ImageOps import multiprocessing as mp from queue import Empty from classification_vgg import ImagenetClassificationVgg # Local module. See 'classification_vgg.py'. user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title url = f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1" class Application(tk.Frame): def __init__(self, master = None): super().__init__(master) self.pack() # Window settings. self.master.title("Display i-PRO camera with tkinter") # Window title self.master.geometry("800x600+100+100") # Window size, position # Event registration for window termination. self.master.protocol("WM_DELETE_WINDOW", self.on_closing_window) # Create menu. menubar = tk.Menu(self.master) self.master.configure(menu=menubar) filemenu = tk.Menu(menubar) menubar.add_cascade(label='File', menu=filemenu) filemenu.add_command(label='Quit', command = self.on_closing_window) # Create button_frame self.button_frame = tk.Frame(self.master, padx=10, pady=10, relief=tk.RAISED, bd=2) self.button_frame.pack(side = tk.BOTTOM, fill=tk.X) # Label self.label_frame1 = tk.Frame(self.button_frame, width=10) self.label_frame1.pack(side=tk.LEFT) self.label_frame2 = tk.Frame(self.button_frame, width=40) self.label_frame2.pack(side=tk.LEFT) self.class_text = tk.StringVar() self.score_text = tk.StringVar() self.class_text.set('') self.score_text.set('') self.label1 = tk.Label(self.label_frame1, text='Class: ').pack(side=tk.TOP) self.label2 = tk.Label(self.label_frame2, textvariable=self.class_text, relief=tk.RIDGE, width=20).pack(side=tk.TOP) self.label3 = tk.Label(self.label_frame1, text='Score: ').pack(side=tk.TOP) self.label4 = tk.Label(self.label_frame2, textvariable=self.score_text, relief=tk.RIDGE, width=20).pack(side=tk.TOP) # Create quit_button self.quit_button = tk.Button(self.button_frame, text='Quit', width=10, command = self.on_closing_window) self.quit_button.pack(side=tk.RIGHT) # Create canvas. self.canvas = tk.Canvas(self.master) # Add mouse click event to canvas. self.canvas.bind('<Button-1>', self.canvas_click) # Place canvas. self.canvas.pack(expand = True, fill = tk.BOTH) # Create queue and value for image receive process. self.imageQueue = mp.Queue() self.request = mp.Value('i', 0) # -1 : Exit ReceiveImageProcess. # 0 : Normal. # 1 : Connect camera. # 2 : Release camera. # Create queue for classification process. self.imageQueue2 = mp.Queue() self.resultQueue = mp.Queue() # Create processes. self.imageReceiveProcess = mp.Process(target=ReceiveImageProcess, args=(self.imageQueue, self.imageQueue2, self.request)) self.classificationProcess = mp.Process(target=ImageClassificationProcess, args=(self.imageQueue2, self.resultQueue)) self.imageReceiveProcess.start() self.classificationProcess.start() # Raise a video display event (disp_image) after 500m self.disp_id = self.after(500, self.disp_image) def on_closing_window(self): ''' Window closing event. ''' if messagebox.askokcancel("QUIT", "Do you want to quit?"): # Request terminate process. self.request.value = -1 self.imageQueue2.put(-1) # Waiting for process p to finish time.sleep(1) # Flash queue. # The program cannot complete processes unless the queue is emptied. for i in range(self.imageQueue.qsize()): image = self.imageQueue.get() for i in range(self.imageQueue2.qsize()): image = self.imageQueue2.get() for i in range(self.resultQueue.qsize()): result = self.resultQueue.get() # Wait for process to be terminated. self.imageReceiveProcess.join() self.classificationProcess.join() self.master.destroy() print("Finish Application.") def canvas_click(self, event): ''' Event handling with mouse clicks on canvas ''' if self.disp_id is None: # Connect camera. self.request.value = 1 # Display image. self.disp_image() else: # Release camera. self.request.value = 2 # Cancel scheduling self.after_cancel(self.disp_id) self.disp_id = None def disp_image(self): ''' Display image on Canvas ''' # If there is data in the imageQueue, the program receives the data and displays the video. num = self.imageQueue.qsize() if num > 0: if (num > 5): num -= 1 for i in range(num): cv_image = self.imageQueue.get() # (2) Convert image from ndarray to PIL.Image. pil_image = Image.fromarray(cv_image) # Get canvas size. canvas_width = self.canvas.winfo_width() canvas_height = self.canvas.winfo_height() # Resize the image to the size of the canvas without changing the aspect ratio. # アスペクトを維持したまま画像を Canvas と同じサイズにリサイズ pil_image = ImageOps.pad(pil_image, (canvas_width, canvas_height)) # (3) Convert image from PIL.Image to PhotoImage # PIL.Image から PhotoImage へ変換する self.photo_image = ImageTk.PhotoImage(image=pil_image) # Display image on the canvas. self.canvas.create_image( canvas_width / 2, # Image display position (center of the canvas) canvas_height / 2, image=self.photo_image # image data ) else: pass # Update GUI Label. result_num = self.resultQueue.qsize() if result_num > 0: for i in range(result_num): label, score = self.resultQueue.get() self.class_text.set(label) score = '{:.4f}'.format(score) self.score_text.set(score) # Raise a video display event (disp_image) after 1ms. self.disp_id = self.after(1, self.disp_image) def ReceiveImageProcess(imageQueue, imageQueue2, request): ''' Receive Image Process. Args: imageQueue [o] Image data for display. imageQueue2 [o] Image data for image classification. request [i] Shared memory for receiving requests from the main process. -1: Terminate process. 0: Nothing. 1: Connect camera. 2: Release camera connection. Returns: None Raises None ''' # Connect camera. cap = cv2.VideoCapture(url) while True: if cap != None: # Get frame. ret, frame = cap.read() if ret == True: # (1) Convert image from BGR to RGB. cv_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # for display. if imageQueue.qsize() < 10: imageQueue.put(cv_image) # for image classification. if imageQueue2.qsize() <= 1: imageQueue2.put(cv_image) else: print("cap.read() return False.") # The timeout period seems to be 30 seconds. # And there seems to be no API to change the timeout value. time.sleep(1) # Reconnect cap.release() cap = cv2.VideoCapture(url) else: time.sleep(0.1) # Check process termination request. if request.value == -1: # Terminate process. cap.release() request.value = 0 break # Check connect request. if request.value == 1: cap = cv2.VideoCapture(url) request.value = 0 # Check release request. if request.value == 2: cap.release() cap = None request.value = 0 print("Terminate ReceiveImageProcess().") def ImageClassificationProcess(imageQueue, resultQueue): ''' Image classification process. Args: imageQueue : [i] Image for image classification. resultQueue : [o] Save classification result labels and scores. Returns: None ''' imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json') while True: try: image = imageQueue.get(True, 10) # If type(image) is 'int' and image is -1, then this process is terminated. if type(image) == int: if image == -1: break # Image classification pilImage = Image.fromarray(image) # convert from OpenCV image to PIL.Image result, score = imagenetClassifigationVgg.do_classification(pilImage) if score > 0.15: print(result, score) resultQueue.put((result, score)) else: print('None') resultQueue.put(('None', 0.0)) except Empty: # timeout of imageQueue.get() print("Timeout happen.(3)") print("Finish ImageClassificationProcess()") if __name__ == "__main__": root = tk.Tk() app = Application(master = root) app.mainloop()
[動画] i-PRO カメラと接続してリアルタイムに画像分類、GUI版
NOTE
デスクトップPCなどの高性能PCではこのまま動作しましたが、標準的なノートPCでは処理負荷が高くて上記のままでは軽快に動作しませんでした。
私の場合は接続先を stream_2 へ変更して映像の解像度を下げることで気持ちよく動作するようになりました。
実際に動作させるPCの性能などに応じてカメラ映像の解像度やフレーム数などを適当に調整してみてください。
そこそこ良い感じに作れたのでは、と思っています。
プログラムが約300ステップまで大きくなってきましたので、機能拡張はこの辺で一旦おしまいにしたいと思います。
本ページで紹介のソースコードは、下記 github より取得できます。
下記 github のソースコードと本ページの内容は差異がある場合があります。
i-pro-corp/python-examples: Examples for i-PRO cameras. (github.com)
本ページの情報は、特記無い限り下記ライセンスで提供されます。
2023/10/20 | - | IP簡単設定ソフトウェア、IP Setting Software リンク先を更新, | 木下英俊 |
2022/7/20 | - | 微修正, | 木下英俊 |
2022/6/22 | - | 新規作成, | 木下英俊 |
i-PRO - Programming Items トップページ