画像分類 - VGG16

 

本ページは i-PRO株式会社 の有志メンバーにより記載されたものです。
本ページの情報は ライセンス に記載の条件で提供されます。

 

 

 

本ページでは、i-PRO カメラと TyTorch および VGG16 という AI ネットワークモデルを使用して「画像分類」と呼ばれる AI 処理を行ってみます。

VGG16 は、2014年の ILSVR で2位になった畳み込みニューラルネットワークです。オックスフォード大学の VGG(Visual Geometry Group) チームが作成した16層から構成されるネットワークモデルであるため VGG16 と呼ばれています。

作成したプログラムの動作例はこちらです。学習済みモデルを使用して1000種類の画像分類を行っています。画面下部に分類結果を表示しています。

こちらに記載の内容は i-PRO のカメラ "i-PRO mini (WV-S7130)"、"モジュールカメラ(AIスターターキット)" を使って動作確認しています。未確認ですが他の i-PRO カメラを使用する場合も恐らくそのまま利用可能です。

 


 

"i-PRO mini" 紹介:

i-PRO mini 画像

 

"モジュールカメラ" 紹介:

AIスターターキット画像(その1) AIスターターキット画像(その2)

 

カメラ初期設定についてはカメラ毎の取扱説明書をご確認ください。

カメラのIPアドレスを確認・設定できる下記ツールを事前に入手しておくと便利です。

 


 

1. 準備

[概要]

Python を事前にインストール済みであることを前提に記載します。

私の評価環境は以下の通りです。

 

[評価環境]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

 

1-1. PyTorch をインストールする

こちら「PyTorch をインストールする - 1-1. CPU」の記事を参考にインストールを行います。

Windows 環境で多くの人が試せるようにしたいので、本ページでは「Compute Platform」を「CPU」としてインストールしている前提で説明を記載します。NVIDIA の GPU など特定のハードウェアを必要としません。

 

 

1-2. 「画像分類」を準備する

作業フォルダを準備して、下記プログラムを保存します。

 

[プログラムソース "preparation.py"]

import os
import urllib.request

# Create the folder "data" when the folder "data" does not exist. 
# フォルダ「data」が存在しない場合はフォルダ data を作成します。
data_dir = "./data/"
if not os.path.exists(data_dir):
    os.mkdir(data_dir)

# Download class_index for ImageNet.
# It is prepared for Keras. This is the JSON file used by the following open source.
# ImageNetのclass_indexをダウンロードします。
# Kerasで用意されているものです。下記オープンソースで使用している JSON ファイルです。
# https://github.com/fchollet/deep-learning-models/blob/master/imagenet_utils.py
url = "https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json"
save_path = os.path.join(data_dir, "imagenet_class_index.json")

if not os.path.exists(save_path):
    urllib.request.urlretrieve(url, save_path)

 

そしてこのプログラムを実行すると、作業フォルダ中に data フォルダを作成してその中に "imagenet_class_index.json" というファイルを作成して保存してくれます。
このファイルは画像分類で使用する 1000種類 のラベル情報です。

 

 

1-3. 必要なライブラリをインストール

matplotlib を使用するので、下記コマンドによりインストールします。

pip3 install matplotlib

matplotlib インストール画面

 

 

2. 静止画を画像分類する

[概要]

学習済みの VGG モデルを使用し、静止画(JPEGファイル)の画像分類を行ってみます。

ここでは入力画像として、 https://pixabay.com から取得した4つの画像(てんとう虫、ゴールデンレトリバー(犬)、車、デイジー(花))を使用させていただき実験してみます。

いずれも 商用利用無料、帰属表示必要なし、の画像です。
各画像の取得元は、下記ソースコード中に記載の URL を参照ください。

下記プログラムを実行する際は、ご自身で各画像をダウンロードして事前に data フォルダに保存してください。

 

サンプル画像1:テントウムシ サンプル画像2:デイジー サンプル画像3:車 サンプル画像4:ゴールデンレトリバー

 

[評価環境]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

[説明]

部分ごとに説明して最後に全体ソースコードを示します。

 

1.

パッケージのインポートを最初に行います。

import numpy as np
import json
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torchvision
from torchvision import models, transforms

 

2.

下記コードにより VGG16 の学習済みモデルをロードします。
初めて実行する際は、学習済みパラメータをインターネットからダウンロードするため、実行に時間がかかります。

        # Create an instance of the VGG16 model
        self.net = models.vgg16( pretrained = True )
        self.net.eval()             # Set to evaluation mode.

        # Display network-model.
        print(self.net)

 

self.net = models.vgg16( pretrained = True ) により取得した学習済みモデルは下記に保存されます。
'~' はログインしているユーザーのホームディレクトリを意味します。

 

Where are my downloaded models saved?

The locations are used in the order of

私の場合は下記に vgg16-397923af.pth を保存していました。約540MBのファイルサイズでした。

"~.cache\torch\hub\checkpoints\vgg16-397923af.pth"

 

3.

入力画像の前処理クラスを作成します。

class BaseTransform():
    '''
    Pre-process the input image. Image resizing, color standardization, etc.
    入力画像の前処理を行う。画像のリサイズ、色の標準化など。
    '''

    def __init__(self, resize, mean, std):
        self.base_transform = transforms.Compose([
            transforms.Resize((resize, resize)),    # Resize both long and short sides to the size of resize.
            #transforms.Resize(resize),             # Resize the short edge length to the size of resize while preserving the aspect
            #transforms.CenterCrop(resize),         # Crop the center of the image with resize × resize.
            transforms.ToTensor(),                  # Convert to Torch-Tensor.
            transforms.Normalize(mean, std)         # color standardization
        ])

    def __call__(self, img):
        '''
        Perform pre-process the input image.
        '''
        return self.base_transform(img)

 

こんな感じで BaseTransform クラスのインスタンスを生成するときに画像サイズ、規格化の情報を与えています。

        # Create an instance of preprocessing.
        resize = 224
        mean = (0.485, 0.456, 0.406)
        std = (0.229, 0.224, 0.225)
        self.transform = BaseTransform(resize, mean, std)

 

4.

出力結果からラベルを判定するクラスを作成します。

最もスコアの高いラベル(predicted_label_name)とそのスコア(score)を返します。
出力を softmax で処理することで全体(1000種別)のスコアを足すと 1.0 になるようにしています。

class ILSVRCPredictor():
    '''
    Get the label name with the highest score from the calculation result.
    演算結果から最もスコアの高いラベル名を取得する。
    '''

    def __init__(self, class_index):
        '''
        Constructor

        Args:
            class_index     [i] class index.
        '''
        self.class_index = class_index

    def predict_max(self, out):
        '''
        Get the label name with the highest score from the calculation result.
        最もスコアの高いラベル名を取得する。
        '''
        data = out.detach().numpy()
        probabilities = torch.nn.functional.softmax(out, dim=1)[0]
        maxid = np.argmax(data)

        score = probabilities[maxid].item()
        predicted_label_name = self.class_index[str(maxid)][1]

        return predicted_label_name, score

 

5.

画像分類を行う本体のクラスを作成します。

class ImagenetClassificationVgg():
    '''
    Image classification.
    画像分類を行う。
    '''

    def __init__(self, class_index_file):
        '''
        Constructor

        Args:
            class_index_file:   [i] class index file path.
        '''

        # PyTorch version.
        print("PyTorch Version: ", torch.__version__)
        print("Torchvision Version: ", torchvision.__version__)

        # Load a trained VGG-16 model.
        # The first time you run it, it will take a long time to run because it will download the trained parameters.
        # 学習済みの VGG-16 モデルをロードする。
        # 初めて実行する際は、学習済みパラメータをダウンロードするため、実行に時間がかかります。

        # Create an instance of the VGG16 model
        self.net = models.vgg16( pretrained = True )
        self.net.eval()             # Set to evaluation mode.

        # Display network-model.
        print(self.net)

        # Create an instance of preprocessing.
        resize = 224
        mean = (0.485, 0.456, 0.406)
        std = (0.229, 0.224, 0.225)
        self.transform = BaseTransform(resize, mean, std)

        # Load ILSVRC label information and create an ILSVRCPredictor instance.
        self.ILSVRC_class_index = json.load( open(class_index_file, 'r') )
        self.predictor = ILSVRCPredictor(self.ILSVRC_class_index)


    def imagenet_classification_vgg(self, img, debug=False):
        '''
        Perform image classification.

        Args:
            img:        [i] An image for image classification. PIL.Image format.
            debug:      [i] if set to True, display debug images.
        Returns:
            results:    Results of image classification.
        '''
        if debug==True:
            # View original image.
            plt.imshow(img)
            plt.show()

        # Preprocessing.
        img_transformed = self.transform(img)  # torch.Size([3, 224, 224])

        if debug==True:
            # Display the image after preprocessing.
            img_transformed_2 = img_transformed.numpy().transpose((1, 2, 0))
            img_transformed_2 = np.clip(img_transformed_2, 0, 1)
            plt.imshow(img_transformed_2)
            plt.show()

        # Added batch size dimension.
        inputs = img_transformed.unsqueeze_(0)  # torch.Size([1, 3, 224, 224])

        # ネットワークモデルへ画像を入力し、出力をラベルに変換
        out = self.net(inputs)      # torch.Size([1, 1000])
        result = self.predictor.predict_max(out)

        return result

 

6.

main 部分です。

ImagenetClassificatinVgg クラスのインスタンスを作成して分類を行う画像を入力するだけです。

if __name__ == "__main__":
    '''
    main
    '''
    imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')

    # Open image file.
    # https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
    # https://pixabay.com/ja/service/license/
    # 商用利用無料、帰属表示必要なし、1280x855
    img = Image.open('./data/ladybug-g7744c038e_1280.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

    # https://pixabay.com/ja/photos/goldenretriever-%E7%8A%AC-3724972/
    # https://pixabay.com/ja/service/license/
    # 商用利用無料、帰属表示必要なし、640x426
    img = Image.open('./data/goldenretriever-3724972_640.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

    # https://pixabay.com/ja/photos/%e8%bb%8a-%e7%94%b2%e8%99%ab-%e3%83%95%e3%82%a9%e3%83%ab%e3%82%af%e3%82%b9%e3%83%af%e3%83%bc%e3%82%b2%e3%83%b3-1283947/
    # https://pixabay.com/ja/service/license/
    # 商用利用無料、帰属表示必要なし、1920x1280
    img = Image.open('./data/car-g955f2640f_1920.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

    # https://pixabay.com/ja/photos/%e3%83%9e%e3%83%bc%e3%82%ac%e3%83%ac%e3%83%83%e3%83%88-%e3%83%87%e3%82%a4%e3%82%b8%e3%83%bc-%e8%8a%b1-729510/
    # https://pixabay.com/ja/service/license/
    # 商用利用無料、帰属表示必要なし、1920x1249
    img = Image.open('./data/marguerite-gfad1f1cea_1920.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

 

 

以下に全ソースコードを記載します。 

 

[全ソースコード "classsification_vgg.py"]

'''
[Abstract]
    Image classification.
    画像分類を行います。

[Details]
    This program classifies still images (JPEG files).
    静止画(JPEG files)の画像分類を行います。

[Library install]
    torch, torchvision : see https://pytorch.org/get-started/locally/
    matplotlib :    pip install matplotlib
    numpy :         pip install numpy
    PIL :           pip install pillow
    json :          json is a built-in module in Python, you don’t need to install it with pip.

[Note]
    Download the JPEG file yourself and save it in the data folder. See main function.
    JPEGファイルはご自身でダウンロードして data フォルダに保存を行ってください。main 関数内の記載をご確認ください。
'''

import numpy as np
import json
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torchvision
from torchvision import models, transforms


class BaseTransform():
    '''
    Pre-process the input image. Image resizing, color standardization, etc.
    入力画像の前処理を行う。画像のリサイズ、色の標準化など。
    '''

    def __init__(self, resize, mean, std):
        self.base_transform = transforms.Compose([
            transforms.Resize((resize, resize)),    # Resize both long and short sides to the size of resize.
            transforms.ToTensor(),                  # Convert to Torch-Tensor.
            transforms.Normalize(mean, std)         # color standardization
        ])

    def __call__(self, img):
        '''
        Perform pre-process the input image.
        '''
        return self.base_transform(img)


class ILSVRCPredictor():
    '''
    Get the label name with the highest score from the calculation result.
    演算結果から最もスコアの高いラベル名を取得する。
    '''

    def __init__(self, class_index):
        '''
        Constructor

        Args:
            class_index     [i] class index.
        '''
        self.class_index = class_index

    def predict_max(self, out):
        '''
        Get the label name with the highest score from the calculation result.
        最もスコアの高いラベル名を取得する。
        '''
        data = out.detach().numpy()
        probabilities = torch.nn.functional.softmax(out, dim=1)[0]
        maxid = np.argmax(data)

        score = probabilities[maxid].item()
        predicted_label_name = self.class_index[str(maxid)][1]

        return predicted_label_name, score


class ImagenetClassificationVgg():
    '''
    Image classification.
    画像分類を行う。
    '''

    def __init__(self, class_index_file):
        '''
        Constructor

        Args:
            class_index_file:   [i] class index file path.
        '''

        # PyTorch version.
        print("PyTorch Version: ", torch.__version__)
        print("Torchvision Version: ", torchvision.__version__)

        # Load a trained VGG-16 model.
        # The first time you run it, it will take a long time to run because it will download the trained parameters.
        # 学習済みの VGG-16 モデルをロードする。
        # 初めて実行する際は、学習済みパラメータをダウンロードするため、実行に時間がかかります。

        # Create an instance of the VGG16 model
        self.net = models.vgg16( pretrained = True )
        self.net.eval()             # Set to evaluation mode.

        # Display network-model.
        print(self.net)

        # Create an instance of preprocessing.
        resize = 224
        mean = (0.485, 0.456, 0.406)
        std = (0.229, 0.224, 0.225)
        self.transform = BaseTransform(resize, mean, std)

        # Load ILSVRC label information and create an ILSVRCPredictor instance.
        self.ILSVRC_class_index = json.load( open(class_index_file, 'r') )
        self.predictor = ILSVRCPredictor(self.ILSVRC_class_index)


    def do_classification(self, img, debug=False):
        '''
        Perform image classification.

        Args:
            img:        [i] An image for image classification. PIL.Image format.
            debug:      [i] if set to True, display debug images.
        Returns:
            results:    Results of image classification.
        '''
        if debug==True:
            # View original image.
            plt.imshow(img)
            plt.show()

        # Preprocessing.
        img_transformed = self.transform(img)  # torch.Size([3, 224, 224])

        if debug==True:
            # Display the image after preprocessing.
            img_transformed_2 = img_transformed.numpy().transpose((1, 2, 0))
            img_transformed_2 = np.clip(img_transformed_2, 0, 1)
            plt.imshow(img_transformed_2)
            plt.show()

        # Added batch size dimension.
        inputs = img_transformed.unsqueeze_(0)  # torch.Size([1, 3, 224, 224])

        # Input images to the network model and convert the output to labels.
        out = self.net(inputs)      # torch.Size([1, 1000])
        result = self.predictor.predict_max(out)

        return result


if __name__ == "__main__":
    '''
    main
    '''
    imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')

    # Open image file.
    # https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
    # https://pixabay.com/ja/service/license/
    # Free for commercial use, no attribution required, 1280x855
    # 商用利用無料、帰属表示必要なし、1280x855
    img = Image.open('./data/ladybug-g7744c038e_1280.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

    # https://pixabay.com/ja/photos/goldenretriever-%E7%8A%AC-3724972/
    # https://pixabay.com/ja/service/license/
    # Free for commercial use, no attribution required, 640x426
    # 商用利用無料、帰属表示必要なし、640x426
    img = Image.open('./data/goldenretriever-3724972_640.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

    # https://pixabay.com/ja/photos/%e8%bb%8a-%e7%94%b2%e8%99%ab-%e3%83%95%e3%82%a9%e3%83%ab%e3%82%af%e3%82%b9%e3%83%af%e3%83%bc%e3%82%b2%e3%83%b3-1283947/
    # https://pixabay.com/ja/service/license/
    # Free for commercial use, no attribution required, 1920x1280
    # 商用利用無料、帰属表示必要なし、1920x1280
    img = Image.open('./data/car-g955f2640f_1920.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

    # https://pixabay.com/ja/photos/%e3%83%9e%e3%83%bc%e3%82%ac%e3%83%ac%e3%83%83%e3%83%88-%e3%83%87%e3%82%a4%e3%82%b8%e3%83%bc-%e8%8a%b1-729510/
    # https://pixabay.com/ja/service/license/
    # Free for commercial use, no attribution required, 1920x1249
    # 商用利用無料、帰属表示必要なし、1920x1249
    img = Image.open('./data/marguerite-gfad1f1cea_1920.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

 

 

実行結果です。コンソールへ出力された内容です。

PyTorch Version:  1.11.0+cpu
Torchvision Version:  0.12.0+cpu
VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (17): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (18): ReLU(inplace=True)
    (19): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (20): ReLU(inplace=True)
    (21): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (22): ReLU(inplace=True)
    (23): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (24): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (25): ReLU(inplace=True)
    (26): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (27): ReLU(inplace=True)
    (28): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (29): ReLU(inplace=True)
    (30): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (avgpool): AdaptiveAvgPool2d(output_size=(7, 7))
  (classifier): Sequential(
    (0): Linear(in_features=25088, out_features=4096, bias=True)
    (1): ReLU(inplace=True)
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=4096, out_features=4096, bias=True)
    (4): ReLU(inplace=True)
    (5): Dropout(p=0.5, inplace=False)
    (6): Linear(in_features=4096, out_features=1000, bias=True)
  )
)
Result: ('ladybug', 0.9478541612625122) 
Result: ('golden_retriever', 0.9413034915924072) 
Result: ('sports_car', 0.3690492510795593) 
Result: ('daisy', 0.9962427616119385) 

 

正しく画像分類できていそうです。

ImagenetClassificationVgg クラスのインスタンスを作成したらあとは画像を渡すだけ、という感じで実行できます。
学習済みのモデルを使用して推論するだけならさほど難しく無いと思います。興味あればチャレンジしてみて下さい。

 

 


3. i-PRO カメラの映像を画像分類する

[概要]

i-PRO カメラと接続して取得した映像に対して「画像分類」をリアルタイムに実施してみたいと思います。本章では認識結果をコンソールへ出力することとします。認識結果を映像上へテキストで重畳表示することも簡単にできますので、興味あればチャレンジしてみてください。

RTSP で画像を取得する」中で OpenCV による顔検知を作成しましたので、このプログラムを元に OpenCV の処理部分を上記で作成した物体検知へ変更してみたいと思います。

 

[評価環境]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

ポイント

上記で作成した "imagenet_classsification_vgg.py" をそのままライブラリとして活用します。

具体的には、下記のように from imagenet_classsification_vgg import ImagenetClassificationVgg と記載することで作成済みプログラムをそのまま使用できます。

'classification_vgg.py' ファイルを同じフォルダ内に保存する必要があります。

 

["classification_main.py"]

from PIL import Image
from classification_vgg import ImagenetClassificationVgg    # Local module. See 'classification_vgg.py'.

if __name__ == "__main__":
    '''
    main
    '''
    imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')

    # Open image file.
    # https://pixabay.com/ja/photos/%e3%81%a6%e3%82%93%e3%81%a8%e3%81%86%e8%99%ab-%e7%94%b2%e8%99%ab-%e3%83%86%e3%83%b3%e3%83%88%e3%82%a6%e3%83%a0%e3%82%b7-1480102/
    # https://pixabay.com/ja/service/license/
    # Free for commercial use, no attribution required, 1280x855
    # 商用利用無料、帰属表示必要なし、1280x855
    img = Image.open('./data/ladybug-g7744c038e_1280.jpg')
    result = imagenetClassifigationVgg.do_classification(img)
    print("Result: ", result)

 

こんな方針でプログラム作成を行っていきます。

 

3-1. まずはシンプルに作成

RTSP で画像を取得する」中で作成したプログラム "connect_with_rtsp_3_1.py" を元に改造することで、RTSP接続して受信したカメラ映像をリアルタイムに画像分類するプログラムを作成してみます。VGG の処理負荷はとても高そうなのでちょっと心配ですが、必要に応じてカメラ側の設定でフレームレートや解像度を下げて使用する、という方針で進めます。

 

[プログラムソース "classification_with_camera_1.py"]

'''
[Abstract]
    Image classification.
    画像分類を行います。

[Details]
    This program connects to an i-PRO camera and classifies live images.
    このプログラムは、i-PRO カメラと接続してライブ映像に対して画像分類を行います。

[Library install]
    torch, torchvision : see https://pytorch.org/get-started/locally/
    cv2 :           pip install opencv-python
    matplotlib :    pip install matplotlib
    numpy :         pip install numpy
    PIL :           pip install pillow
    json :          Built-in module in Python, you don’t need to install it with pip.
'''

import cv2
from PIL import Image
from classification_vgg import ImagenetClassificationVgg    # Local module. See 'classification_vgg.py'.


user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title


# Exception definition.
BackendError = type('BackendError', (Exception,), {})

def IsWindowVisible(winname):
    '''
    Check if the target window exists.

    Args:
        winname :       Window title.
    Returns:
        True :          Exist.
        False :         Not exist.
    Raise:
        BackendError :
    '''
    try:
        ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
        if ret == -1:
            raise BackendError('Use Qt as backend to check whether window is visible or not.')

        return bool(ret)

    except cv2.error:
        return False


def CV2Pil(image):
    '''
    Convert from OpenCV to PIL.Image
    
    Params:
        image:  OpenCV image.
    Returns:
        PIL.Image format image.    
    '''
    new_image = image.copy()
    if new_image.ndim == 2:         # Grayscale
        pass
    elif new_image.shape[2] == 3:   # Color
        new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB)
    elif new_image.shape[2] == 4:   # Color with alpha channel
        new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA)
    new_image = Image.fromarray(new_image)
    return new_image


'''
[Abstract]
    main 関数
'''
if __name__ == '__main__':
    # Create an instance of class ImagenetClassificationVgg.
    imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')

    # Create an instance of class cv2.VideoCapture
    cap = cv2.VideoCapture(f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1")

    # 
    windowInitialized = False

    while True:
        try:
            ret, frame = cap.read()
            if ret == True:
                # Image classification
                pilImage = CV2Pil(frame)
                result, score = imagenetClassifigationVgg.do_classification(pilImage)

                if score > 0.15:
                    print(result, score)
                else:
                    print('None')

                # Resize to a display size that fits on your PC screen.
                width   = 640
                height  = 480
                h, w = frame.shape[:2]
                aspect = w / h
                if width / height >= aspect:
                    nh = height
                    nw = round(nh * aspect)
                else:
                    nw = width
                    nh = round(nw / aspect)
                frame2 = cv2.resize(frame, (nw, nh))
            
                # Display image.
                cv2.imshow(winname, frame2)

                if windowInitialized==False:
                    # Specify the display position only at the beginning.
                    cv2.moveWindow(winname, 100, 100)
                    windowInitialized = True

            # Press the "q" key to finish.
            k = cv2.waitKey(1) & 0xff   # necessary to display the video by imshow ()
            if k == ord("q"):
                break
            
            # Exit if there is no specified window.
            if not IsWindowVisible(winname):
                break

        except KeyboardInterrupt:
            # Press '[ctrl] + [c]' on the console to exit the program.
            print("KeyboardInterrupt")
            break

    print("Finish main()")
    cap.release()
    cv2.destroyAllWindows()

 

結果:

予想通り VGG の処理がとても重たく、PC性能にもよると思いますが、今回実施しているCPU処理ではフレームレートを 1,3,5fps 程度に設定する必要がありそうです。

加えてフレームレートを下げても一定の遅延を発生しました。恐らく10フレーム程度のバッファリングが行われており、例えば 1fps に設定すると10秒程度の遅延を常に生じます。

用途にもよりますが、ちょっと残念。映像表示だけでも10fps以上の通常表示を維持しつつ、画像分類の処理をできるだけ実施するというような改善を考えてみたいところです。

 

 

3-2. マルチタスク化して処理を高速化してみる

そこで、画像分類の処理を別タスクに分離することで、映像受信と映像デコード処理を止めずにできるだけ画像分類をやってみる、という感じにプログラムを修正してみます。

multiprocessing, queue というライブラリを使用して実現してみます。

こちらが新規に作成したプログラムです。"connect_with_rtsp_3_2.py" を元に作成しています。

 

[プログラムソース "classification_with_camera_2.py"]

'''
[Abstract]
    Image classification.
    画像分類を行います。

[Details]
    This program connects to an i-PRO camera and classifies live images with multitasking.
    このプログラムは、i-PRO カメラと接続してライブ映像に対してマルチタスク処理で画像分類を行います。

[Library install]
    torch, torchvision : see https://pytorch.org/get-started/locally/
    cv2 :           pip install opencv-python
    matplotlib :    pip install matplotlib
    numpy :         pip install numpy
    PIL :           pip install pillow
    json, multiprocessing, queue :
                    Built-in module in Python, you don’t need to install it with pip.
'''

import cv2
import multiprocessing as mp
from queue import Empty
from PIL import Image
from classification_vgg import ImagenetClassificationVgg    # Local module. See 'classification_vgg.py'.


user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title


# Exception definition.
BackendError = type('BackendError', (Exception,), {})

def IsWindowVisible(winname):
    '''
    Check if the target window exists.

    Args:
        winname :       Window title.
    Returns:
        True :          Exist.
        False :         Not exist.
    Raise:
        BackendError :
    '''
    try:
        ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
        if ret == -1:
            raise BackendError('Use Qt as backend to check whether window is visible or not.')

        return bool(ret)

    except cv2.error:
        return False


def CV2Pil(image):
    '''
    Convert from OpenCV to PIL.Image
    
    Params:
        image:  OpenCV image.
    Returns:
        PIL.Image format image.    
    '''
    new_image = image.copy()
    if new_image.ndim == 2:         # Grayscale
        pass
    elif new_image.shape[2] == 3:   # Color
        new_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2RGB)
    elif new_image.shape[2] == 4:   # Color with alpha channel
        new_image = cv2.cvtColor(new_image, cv2.COLOR_BGRA2RGBA)
    new_image = Image.fromarray(new_image)
    return new_image


def ImageClassificationProcess(q):
    '''
    Image classification process.

    Args:
        q1 :        [i] Queue that stores images for face detection.
        q2 :        [o] Queue that stores face detection results.
    Returns:
        None
    '''
    # Create an instance of class ImagenetClassificationVgg.
    imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')

    while True:
        try:
            image = q.get(True, 10)

            # Terminate process
            if type(image) == int:
                if image == -1:
                    break

            # Image classification
            pilImage = CV2Pil(image)
            result, score = imagenetClassifigationVgg.do_classification(pilImage)

            if score > 0.15:
                print(result, score)
            else:
                print('None')

        except Empty: # timeout of q1.get()
            print("Timeout happen.(3)")

    print("Finish ImageClassificationProcess()")    


'''
[Abstract]
    __main__
'''
if __name__ == '__main__':
    # Create an instance of class cv2.VideoCapture
    cap = cv2.VideoCapture(f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1")

    #
    windowInitialized = False

    # Create and start image classification process.
    q = mp.Queue()
    p = mp.Process(target=ImageClassificationProcess, args=(q,))
    p.start()

    while True:
        try:
            ret, frame = cap.read()
            if ret == True:
                # 
                if (q.qsize() <= 1):
                    q.put(frame)

                # Resize to a display size that fits on your PC screen.
                width   = 640
                height  = 480
                h, w = frame.shape[:2]
                aspect = w / h
                if width / height >= aspect:
                    nh = height
                    nw = round(nh * aspect)
                else:
                    nw = width
                    nh = round(nw / aspect)
                frame2 = cv2.resize(frame, (nw, nh))
            
                # Display image.
                cv2.imshow(winname, frame2)

                if windowInitialized==False:
                    # Specify the display position only at the beginning.
                    cv2.moveWindow(winname, 100, 100)
                    windowInitialized = True

            # Press the "q" key to finish.
            k = cv2.waitKey(1) & 0xff   # necessary to display the video by imshow ()
            if k == ord("q"):
                break
            
            # Exit if there is no specified window.
            if not IsWindowVisible(winname):
                break

        except KeyboardInterrupt:
            # Press '[ctrl] + [c]' on the console to exit the program.
            print("KeyboardInterrupt")
            break

    # Terminate process p
    q.put(-1)
    # Waiting for process p to finish
    p.join()

    print("Finish main()")
    cap.release()
    cv2.destroyAllWindows()

 

[動画] i-PRO カメラと接続して、リアルタイムに画像分類

 

CPU 版の PyTorch での動作ですが、十分に高速な処理をしてくれているように私は感じました。
GPU 版を使うともっと素敵なパフォーマンスで動作することと思いますが、この画像分類についてはこれでもいろいろと活用できるのではないでしょうか。

 

 


4. i-PRO カメラの映像と画像分類結果をGUI表示する (tkinter)

[概要]

前述の画像分類を GUI(tkinter)版で作成してみます。

RTSP で画像を取得する : 7-3. メニュー・ボタンを追加して GUI アプリらしくしてみる」で作成した GUI プログラムをベースに改造してみます。

 

ポイント

  • GUI 下部に "Class" と "Score" を表示するための Label を追加してみました。ここに画像分類した結果を表示するようにしてみます。
  • 3章と同様に、2章で作成したモジュール "classification_vgg.py" を使います。同じフォルダにこのファイルを置いてください。
  • 映像表示に極力影響を与えないように、映像受信プロセス(ReceiveImageProcess)から画像分類プロセス(ImageClassificationProcess)へ画像データを渡し、別プロセスで画像分類を行っています。そして認識した結果のラベルとスコアを main プロセスへ渡して画面へ表示する、というようなデータの流れで作成しています。

 

 

[評価環境]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

 

[プログラムソース "classification_gui.py"]

'''
[Abstract]
    Image classification.
    画像分類を行います。

[Details]
    Create a GUI application using tkinter.
    tkinter を使って GUI アプリケーションを作成します。

[Library install]
    cv2 :   pip install opencv-python
    PIL :   pip install pillow
'''

import cv2
import time
import tkinter as tk
from tkinter import messagebox
from PIL import Image, ImageTk, ImageOps
import multiprocessing as mp
from queue import Empty
from classification_vgg import ImagenetClassificationVgg    # Local module. See 'classification_vgg.py'.


user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
url         = f"rtsp://{user_id}:{user_pw}@{host}/MediaInput/stream_1"


class Application(tk.Frame):
    def __init__(self, master = None):
        super().__init__(master)
        self.pack()

        # Window settings.
        self.master.title("Display i-PRO camera with tkinter")      # Window title
        self.master.geometry("800x600+100+100")                     # Window size, position

        # Event registration for window termination.
        self.master.protocol("WM_DELETE_WINDOW", self.on_closing_window)

        # Create menu.
        menubar = tk.Menu(self.master)
        self.master.configure(menu=menubar)
        filemenu = tk.Menu(menubar)
        menubar.add_cascade(label='File', menu=filemenu)
        filemenu.add_command(label='Quit', command = self.on_closing_window)

        # Create button_frame
        self.button_frame = tk.Frame(self.master, padx=10, pady=10, relief=tk.RAISED, bd=2)
        self.button_frame.pack(side = tk.BOTTOM, fill=tk.X)

        # Label
        self.label_frame1 = tk.Frame(self.button_frame, width=10)
        self.label_frame1.pack(side=tk.LEFT)
        self.label_frame2 = tk.Frame(self.button_frame, width=40)
        self.label_frame2.pack(side=tk.LEFT)
        self.class_text = tk.StringVar()
        self.score_text = tk.StringVar()
        self.class_text.set('')
        self.score_text.set('')
        self.label1 = tk.Label(self.label_frame1, text='Class: ').pack(side=tk.TOP)
        self.label2 = tk.Label(self.label_frame2, textvariable=self.class_text, relief=tk.RIDGE, width=20).pack(side=tk.TOP)
        self.label3 = tk.Label(self.label_frame1, text='Score: ').pack(side=tk.TOP)
        self.label4 = tk.Label(self.label_frame2, textvariable=self.score_text, relief=tk.RIDGE, width=20).pack(side=tk.TOP)


        # Create quit_button
        self.quit_button = tk.Button(self.button_frame, text='Quit', width=10, command = self.on_closing_window)
        self.quit_button.pack(side=tk.RIGHT)
        
        # Create canvas.
        self.canvas = tk.Canvas(self.master)

        # Add mouse click event to canvas.
        self.canvas.bind('<Button-1>', self.canvas_click)

        # Place canvas.
        self.canvas.pack(expand = True, fill = tk.BOTH)

        # Create queue and value for image receive process.
        self.imageQueue = mp.Queue()
        self.request = mp.Value('i', 0)     # -1 : Exit ReceiveImageProcess.
                                            #  0 : Normal.
                                            #  1 : Connect camera.
                                            #  2 : Release camera.

        # Create queue for classification process.
        self.imageQueue2 = mp.Queue()
        self.resultQueue = mp.Queue()

        # Create processes.
        self.imageReceiveProcess = mp.Process(target=ReceiveImageProcess, args=(self.imageQueue, self.imageQueue2, self.request))
        self.classificationProcess = mp.Process(target=ImageClassificationProcess, args=(self.imageQueue2, self.resultQueue))
        self.imageReceiveProcess.start()
        self.classificationProcess.start()

        # Raise a video display event (disp_image) after 500m
        self.disp_id = self.after(500, self.disp_image)

    def on_closing_window(self):
        ''' Window closing event. '''

        if messagebox.askokcancel("QUIT", "Do you want to quit?"):
            # Request terminate process.
            self.request.value = -1
            self.imageQueue2.put(-1)

            # Waiting for process p to finish
            time.sleep(1)

            # Flash queue.
            # The program cannot complete processes unless the queue is emptied.
            for i in range(self.imageQueue.qsize()):
                image = self.imageQueue.get()
            for i in range(self.imageQueue2.qsize()):
                image = self.imageQueue2.get()
            for i in range(self.resultQueue.qsize()):
                result = self.resultQueue.get()

            # Wait for process to be terminated.
            self.imageReceiveProcess.join()
            self.classificationProcess.join()
            self.master.destroy()
            print("Finish Application.")

    def canvas_click(self, event):
        ''' Event handling with mouse clicks on canvas '''

        if self.disp_id is None:
            # Connect camera.
            self.request.value = 1
            # Display image.
            self.disp_image()

        else:
            # Release camera.
            self.request.value = 2
            # Cancel scheduling
            self.after_cancel(self.disp_id)
            self.disp_id = None

    def disp_image(self):
        ''' Display image on Canvas '''

        # If there is data in the imageQueue, the program receives the data and displays the video.
        num = self.imageQueue.qsize()
        if num > 0:
            if (num > 5):
                num -= 1
            for i in range(num):
                cv_image = self.imageQueue.get()

            # (2) Convert image from ndarray to PIL.Image.
            pil_image = Image.fromarray(cv_image)

            # Get canvas size.
            canvas_width = self.canvas.winfo_width()
            canvas_height = self.canvas.winfo_height()

            # Resize the image to the size of the canvas without changing the aspect ratio.
            # アスペクトを維持したまま画像を Canvas と同じサイズにリサイズ
            pil_image = ImageOps.pad(pil_image, (canvas_width, canvas_height))

            # (3) Convert image from PIL.Image to PhotoImage
            # PIL.Image から PhotoImage へ変換する
            self.photo_image = ImageTk.PhotoImage(image=pil_image)

            # Display image on the canvas.
            self.canvas.create_image(
                canvas_width / 2,       # Image display position (center of the canvas)
                canvas_height / 2,                   
                image=self.photo_image  # image data
                )
        else:
            pass

        # Update GUI Label.
        result_num = self.resultQueue.qsize()
        if result_num > 0:
            for i in range(result_num):
                label, score = self.resultQueue.get()
            self.class_text.set(label)
            score = '{:.4f}'.format(score)
            self.score_text.set(score)

        # Raise a video display event (disp_image) after 1ms.
        self.disp_id = self.after(1, self.disp_image)


def ReceiveImageProcess(imageQueue, imageQueue2, request):
    '''
    Receive Image Process.

    Args:
        imageQueue      [o] Image data for display.
        imageQueue2     [o] Image data for image classification.
        request         [i] Shared memory for receiving requests from the main process.
                            -1: Terminate process.
                             0: Nothing.
                             1: Connect camera.
                             2: Release camera connection.
    Returns:
        None
    Raises
        None
    '''

    # Connect camera.
    cap = cv2.VideoCapture(url)

    while True:
        if cap != None:
            # Get frame.
            ret, frame = cap.read()

            if ret == True:
                # (1) Convert image from BGR to RGB.
                cv_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                # for display.
                if imageQueue.qsize() < 10:
                    imageQueue.put(cv_image)
                
                # for image classification.
                if imageQueue2.qsize() <= 1:
                    imageQueue2.put(cv_image)

            else:
                print("cap.read() return False.")
                # The timeout period seems to be 30 seconds.
                # And there seems to be no API to change the timeout value.
                time.sleep(1)

                # Reconnect
                cap.release()
                cap = cv2.VideoCapture(url)
        else:
            time.sleep(0.1)
                
        # Check process termination request.
        if request.value == -1:
            # Terminate process.
            cap.release()
            request.value = 0
            break

        # Check connect request.
        if request.value == 1:
            cap = cv2.VideoCapture(url)
            request.value = 0

        # Check release request.
        if request.value == 2:
            cap.release()
            cap = None
            request.value = 0

    print("Terminate ReceiveImageProcess().")


def ImageClassificationProcess(imageQueue, resultQueue):
    '''
    Image classification process.

    Args:
        imageQueue :        [i] Image for image classification.
        resultQueue :       [o] Save classification result labels and scores.
    Returns:
        None
    '''
    imagenetClassifigationVgg = ImagenetClassificationVgg('./data/imagenet_class_index.json')

    while True:
        try:
            image = imageQueue.get(True, 10)

            # If type(image) is 'int' and image is -1, then this process is terminated.
            if type(image) == int:
                if image == -1:
                    break

            # Image classification
            pilImage = Image.fromarray(image)   # convert from OpenCV image to PIL.Image
            result, score = imagenetClassifigationVgg.do_classification(pilImage)

            if score > 0.15:
                print(result, score)
                resultQueue.put((result, score))
            else:
                print('None')
                resultQueue.put(('None', 0.0))

        except Empty: # timeout of imageQueue.get()
            print("Timeout happen.(3)")

    print("Finish ImageClassificationProcess()")


if __name__ == "__main__":
    root = tk.Tk()
    app = Application(master = root)
    app.mainloop()

 

 

[動画] i-PRO カメラと接続してリアルタイムに画像分類、GUI版

 

NOTE

デスクトップPCなどの高性能PCではこのまま動作しましたが、標準的なノートPCでは処理負荷が高くて上記のままでは軽快に動作しませんでした。
私の場合は接続先を stream_2 へ変更して映像の解像度を下げることで気持ちよく動作するようになりました。
実際に動作させるPCの性能などに応じてカメラ映像の解像度やフレーム数などを適当に調整してみてください。

 

そこそこ良い感じに作れたのでは、と思っています。
プログラムが約300ステップまで大きくなってきましたので、機能拡張はこの辺で一旦おしまいにしたいと思います。

 

 

 

ソースコード所在

本ページで紹介のソースコードは、下記 github より取得できます。

下記 github のソースコードと本ページの内容は差異がある場合があります。

i-pro-corp/python-examples: Examples for i-PRO cameras. (github.com)

 

 

ライセンス

本ページの情報は、特記無い限り下記ライセンスで提供されます。


Copyright 2022 i-PRO Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

 

 


参考

 


 

変更履歴

2023/10/20 - IP簡単設定ソフトウェア、IP Setting Software リンク先を更新, 木下英俊
2022/7/20 - 微修正, 木下英俊
2022/6/22 - 新規作成, 木下英俊

 

i-PRO - Programming Items トップページ

プライバシーポリシー