MJPEG で画像を取得する (Python)

 

本ページは i-PRO株式会社 の有志メンバーにより記載されたものです。
本ページの情報は ライセンス に記載の条件で提供されます。

 

 

 

本ページでは、i-PRO カメラとPCを MJPEG により接続してPC画面へ表示するプログラムを Python で作成する例を紹介します。とても短いプログラムで i-PRO カメラの映像を見ることができます。動作確認は i-PRO mini (WV-S7130)、モジュールカメラ(AIスターターキット)を使って行いましたが、ほとんどの i-PRO カメラでそのまま利用できるはずです。ぜひお試しください。

 

[動画] MJPEG でカメラと接続して映像表示した様子

 


 

"i-PRO mini" 紹介:

i-PRO mini 画像

 

"モジュールカメラ" 紹介:

AI スターターキット 画像(1) AI スターターキット 画像(2)

 

カメラ初期設定についてはカメラ毎の取扱説明書をご確認ください。

カメラのIPアドレスを確認・設定できる下記ツールを事前に入手しておくと便利です。

 


 

1. MJPEG 表記仕様

[概要]

MJPEG で接続するための表記を以下に記載します。

「ネットワークカメラCGIコマンドインターフェース仕様書 統合版」[1] で下記に記載されている情報を元に加筆しています。

 

http://<user-id>:<user-password>@<カメラのIPアドレス>/nphMotionJpeg?Resolution=<解像度>&Quality=<品質>&Framerate=<フレームレート>

 

(具体例)

http://admin:password@192.168.0.10/nphMotionJpeg?Resolution=1920x1080&Quality=Standard&Framerate=15

 

注意事項:

 

 

 

2. i-PRO カメラと MJPEG 接続して映像を表示してみる

[概要]

とりあえず映像を取得してPC画面に表示するまでをやってみます。

 

[評価環境1]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

[評価環境2]

言語 : Python, 3.8.10
OS : Ubuntu(WSL), 20.04

 

 

2-1. 方法1

まずは簡単な方法から。RTSP のコードとほとんど同じ内容で実現できました。

 

[プログラム]

プログラムを終了する方法を実装していません。コンソール上で [ctrl]+[c] して終了してください。

 

[プログラムソース "connect_with_mjpeg_1_1.py"]

'''
[Abstract]
    Try connecting to an i-PRO camera with MJPEG(Motion JPEG).
    MJPEG(Motion JPEG) で i-PRO カメラと接続します

[Details]
    Let's try first. (Method 1)
    まずはやってみる (方式1)

[library install]
    cv2:    pip install opencv-python
'''

import cv2

user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
resolution  = "1920x1080"       # Resolution
framerate   =  15               # Frame rate

# URL
url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}"
cap = cv2.VideoCapture(url)


while True:
    try:
        ret, frame = cap.read()
        if ret == True:
            # Please modify the value to fit your PC screen size.
            frame2 = cv2.resize(frame, (1280, 720))
            
            # Display video.
            cv2.imshow(winname, frame2)

        cv2.waitKey(1)      # necessary to display the video by imshow ()

    except KeyboardInterrupt:
        # Press '[ctrl] + [c]' on the console to exit the program.
        print("KeyboardInterrupt")
        break

cap.release()
cv2.destroyAllWindows()

 

上記プログラムを動かしてみます。

Windows ではこんな感じで実行します。

$ python connect_with_mjpeg_1.py

 

Linux はこんな感じで実行します。

$ python3 connect_with_mjpeg_1.py

 

Windows環境で複数の Python バージョンをインストールしている場合、下図のような感じで実行バージョンを指定することもできます。
こちらはバージョン 3.10 の Python で実行する例です。

$ py -3.10 connect_with_mjpeg_1.py

 

上記プログラムを動かした様子を動画で示します。

こんなに簡単なプログラムでとても快適な映像表示を実現することができました。

[注意] 上記でも記載しましたが、カメラ側の設定でストリーム(1)~(4)を Off にすることで滑らかな映像表示を実現できました。On のままでもプログラム自体は動作しますが、5fps 程度の映像となりました。

[動画] MJPEG でカメラと接続して映像表示した様子 (30fps) (注意:ストリーム1~4 を全て Off に設定しています)

 

 

2-2. 方法2

下記方法でも MJPEG で接続して映像表示できます。記事[2]を参考に作成してみました。

 

[プログラム]

[プログラムソース "connect_with_mjpeg_1_2.py"]

'''
[Abstract]
    Try connecting to an i-PRO camera with MJPEG(Motion JPEG).
    MJPEG(Motion JPEG) で i-PRO カメラと接続します

[Details]
    Let's try first. (Method 2)
    まずはやってみる(方式2)

[library install]
    cv2:    pip install opencv-python
'''

import cv2
import numpy as np
import urllib.request as rq


user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
resolution  = "1920x1080"       # Resolution
framerate   =  15               # Frame rate

# URL
url = f"http://{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}"


def set_digest_auth(uri, user, passwd):
    '''
    [abstract]
        Authenticate with the IP camera.

    [params]
        uri:      CGI command for start mjpeg stream.
        user:     user-id for camera.
        passwd:   user-password for camera.
    '''
    pass_mgr = rq.HTTPPasswordMgrWithDefaultRealm()
    pass_mgr.add_password(realm=None, uri=uri, user=user, passwd=passwd)
    auth_handler = rq.HTTPDigestAuthHandler(pass_mgr)
    opener = rq.build_opener(auth_handler)
    rq.install_opener(opener)


set_digest_auth(url, user_id, user_pw)
stream = rq.urlopen(url)

bytes = bytes()
while True:
    try:
        bytes += stream.read(1024)
        a = bytes.find(b'\xff\xd8')     # SOI  (Start of Image)  0xFFD8
        b = bytes.find(b'\xff\xd9')     # EOI  (End   of Image)  0xFFD9
        if a != -1 and b != -1:
            jpg = bytes[a:b+2]
            bytes = bytes[b+2:]

            # Convert binary data to ndarray type.
            img_buf = np.frombuffer(jpg, dtype=np.uint8)

            # Decode ndarray data to OpenCV format image data.
            frame = cv2.imdecode(img_buf, cv2.IMREAD_UNCHANGED)

            # Please modify the value to fit your PC screen size.
            frame2 = cv2.resize(frame, (1280, 720))

            # Display video.
            cv2.imshow(winname, frame2)
            cv2.waitKey(1)      # necessary to display the video by imshow ()

    except KeyboardInterrupt:
        # Press '[ctrl] + [c]' on the console to exit the program.
        print("KeyboardInterrupt")
        break

cv2.destroyAllWindows()

 

 

 

3.プログラムを改善する

[概要]

前章で作成したプログラムはとても簡単に作成できましたが、いろいろと課題がありました。
とりあえず下記3つの課題を解決してみます。

 

課題1
プログラムを起動するたびにウィンドウ位置が変わる。場合によっては画面外へ表示する場合もあって不便。
適当に画面内に収まる場所に表示してほしい。
⇨ 指定する場所にウィンドウを表示するようにします。

 

課題2
プログラムを終了するのが大変。
ウィンドウ右上の[x]を押すとウィンドウがいったん消えるが、すぐに再表示されて終われない。
⇨ ウィンドウ右上の[x]ボタンでプログラムを終了できるようにします。

 

課題3
同様に、任意のキー入力でプログラムを終了できるとうれしい。
⇨ "q" キー押下でプログラムを終了できるようにします。

 

 

 

[評価環境1]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

[評価環境2]

言語 : Python, 3.8.10
OS : Ubuntu(WSL), 20.04

 

 

[プログラム]

 

[プログラムソース "connect_with_mjpeg_2.py"]

'''
[Abstract]
    Try connecting to an i-PRO camera with MJPEG(Motion JPEG).
    MJPEG(Motion JPEG) で i-PRO カメラと接続します

[Details]
    Let's improve the three issues of "connect_with_mjpeg_1_1.py".
    "connect_with_mjpeg_1_1.py" で確認した下記3つの課題を改善してみます。

    [Issues 1]
    Specifies the position where the window is displayed.
    ウィンドウを指定する場所に表示するようにします。
    [Issues 2]
    Modify the program so that you can exit the program by clicking the [x] button.
    ウィンドウ右上の[x]ボタンでプログラムを終了できるようにします。
    [Issues 3]
    Modify the program so that you can exit the program by pressing the [q] key.
    "q" キー押下でプログラムを終了できるようにします。

[Library install]
    cv2:    pip install opencv-python
'''

import cv2

user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
resolution  = "1920x1080"       # Resolution
framerate   =  15               # Frame rate

# URL
url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}"
cap = cv2.VideoCapture(url)

#
windowInitialized = False

# Exception definition
BackendError = type('BackendError', (Exception,), {})

def IsWindowVisible(winname):
    '''
    [Abstract]
        Check if the target window exists.
        対象ウィンドウが存在するかを確認する。
    [Param]
        winname :       Window title
    [Return]
        True :          exist
                        存在する
        False :         not exist
                        存在しない
    [Exception]
        BackendError :
    '''
    try:
        ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
        if ret == -1:
            raise BackendError('Use Qt as backend to check whether window is visible or not.')

        return bool(ret)

    except cv2.error:
        return False


while True:
    try:
        ret, frame = cap.read()
        if ret == True:
            # Please modify the value to fit your PC screen size.
            frame2 = cv2.resize(frame, (1280, 720))

            # Display video.
            cv2.imshow(winname, frame2)

            if windowInitialized==False:
                # Specify window position only once at startup.
                cv2.moveWindow(winname, 100, 100)
                windowInitialized = True

        # Press the "q" key to finish.
        k = cv2.waitKey(1) & 0xff   # necessary to display the video by imshow ()
        if k == ord("q"):
            break
        
        # Exit the program if there is no specified window.
        if not IsWindowVisible(winname):
            break

    except KeyboardInterrupt:
        # Press '[ctrl] + [c]' on the console to exit the program.
        print("KeyboardInterrupt")
        break

cap.release()
cv2.destroyAllWindows()

 

 

 

4. OpenCV で顔検知を加えてみる

MJPEG の実装でも OpenCV による顔検知を実装してみます。

MJPEG 接続では映像情報は受け身です。このため高解像度、高フレームレートの映像を処理したとき、OpenCV の処理が追いつくかが心配な部分です。

 

下記 URL からファイル "haarcascade_frontalface_alt2.xml" を入手してプログラムと同じ場所に保存する必要があります。

https://github.com/opencv/opencv/tree/master/data/haarcascades

xml ファイル取得方法は こちら を参照ください

 

[評価環境1]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

[評価環境2]

言語 : Python, 3.8.10
OS : Ubuntu(WSL), 20.04

 

4-1. まずは単純にやってみる

とにかくまずはやってみます。

映像を受信するたびの OpenCV で毎回認識処理を行ってみます。

 

[プログラムソース "connect_with_mjpeg_3_1.py"]

'''
[Abstract]
    Try connecting to an i-PRO camera with MJPEG(Motion JPEG).
    MJPEG(Motion JPEG) で i-PRO カメラと接続します

[Details]
    Let's add face detection using OpenCV.
    OpenCV を使って顔検知を追加してみます

[Library install]
    cv2:    pip install opencv-python

[OpenCV]
    Get the file "haarcascade_frontalface_alt2.xml" from the URL below.
    下記URLからファイル "haarcascade_frontalface_alt2.xml" を入手するしてください。
    https://github.com/opencv/opencv/tree/master/data/haarcascades
'''

import cv2

user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
resolution  = "1920x1080"       # Resolution
framerate   =  15               # Frame rate

# URL
url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}"

# Exception definition.
BackendError = type('BackendError', (Exception,), {})

def IsWindowVisible(winname):
    '''
    [Abstract]
        Check if the target window exists.
        対象ウィンドウが存在するかを確認する。
    [Param]
        winname :       Window title
    [Return]
        True :          exist
                        存在する
        False :         not exist
                        存在しない
    [Exception]
        BackendError :
    '''
    try:
        ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
        if ret == -1:
            raise BackendError('Use Qt as backend to check whether window is visible or not.')

        return bool(ret)

    except cv2.error:
        return False

def DetectFaces(cascade, image):
    '''
    [Abstract]
        Detect faces and return recognition result.
        顔検知して認識結果を返す
    [Param]
        cascade :       CascadeClassifier object in OpenCV format.
        image :         Image in OpenCV format.
    [Return]
        Detection result.
    '''
    # Convert to grayscale image for face detection.
    img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Detect faces from image.
    face_list = cascade.detectMultiScale(img_gray, minSize=(100, 100))

    # Return result.
    return face_list


def DrawFaceRectangles(image, face_list):
    '''
    [Abstract]
        Draw red frames on the image according to the detection result face_list.
        検出結果 face_list にしたがって、image 上に赤枠を描画する。
    [Param]
        image :         Image in OpenCV format.
        face_list :     List of detected face frames.
    [Return]
        None
    '''
    # Draw red frames for the number of detected faces.
    if len(face_list) != 0:
        for (pos_x, pos_y, w, h) in face_list:
            print(f"pos_x = {pos_x}, pos_y = {pos_y}, w = {w}, h = {h}")
            cv2.rectangle(image, (pos_x, pos_y), (pos_x + w, pos_y + h), (0,0,255), thickness=5)


'''
[Abstract]
    main 関数
'''
if __name__ == '__main__':

    cap = cv2.VideoCapture(url)

    #
    windowInitialized = False

    # haarcascade file for opencv cascade classification.
    cascade_file = "haarcascade_frontalface_alt2.xml"       # face
    #cascade_file = "haarcascade_eye.xml"                   # eye ?
    #cascade_file = "haarcascade_eye_tree_eyeglasses.xml"   # eye ?
    cascade = cv2.CascadeClassifier(cascade_file)

    while True:
        try:
            ret, frame = cap.read()
            if ret == True:
                # Face detection.
                face_list = DetectFaces(cascade, frame)

                # Draw face rectangles.
                DrawFaceRectangles(frame, face_list)

                # Please modify the value to fit your PC screen size.
                frame2 = cv2.resize(frame, (1280, 720))
                
                # Display video.
                cv2.imshow(winname, frame2)

                if windowInitialized==False:
                    # Specify window position only once at startup.
                    cv2.moveWindow(winname, 100, 100)
                    windowInitialized = True

            # Press the "q" key to finish.
            k = cv2.waitKey(1) & 0xff   # necessary to display the video by imshow ()
            if k == ord("q"):
                break
            
            # Exit the program if there is no specified window.
            if not IsWindowVisible(winname):
                break

        except KeyboardInterrupt:
            # Press '[ctrl] + [c]' on the console to exit the program.
            print("KeyboardInterrupt")
            break

    cap.release()
    cv2.destroyAllWindows()

 

結果:

私のゲーミングPCではこれでもそこそこ動作しました。思ったより動く、という感想です。

が、それでもだんだん映像が遅れていきます。
顔検知処理と描画の部分をコメントアウトすると、映像表示の遅れはなくなります。
やはり顔検知処理は PC にとって結構重たい処理のようです。

ちょっと残念。何か改善策を考えてみたいところです。

 

 

4-2. 顔検知部分を別プロセスの処理にしてみる

そこで、顔検知部分を別タスクに分離することで、映像受信と映像デコード処理を止めずにできるだけ顔検知をやってみる、という感じにプログラムを修正してみます。

別タスクというと一般的なプログラムでは "スレッド" というテクニックを使いますが、どうやら CPython と呼ばれるプラットフォームの場合はスレッドは複数の処理を同時に実行してくれないらしいです。そこで、ここでは別プロセスを起動し、キューと呼ばれるIOで情報をやり取りしてみます。

 

[プログラムソース "connect_with_mjpeg_3_2.py"]

'''
[Abstract]
    Try connecting to an i-PRO camera with MJPEG(Motion JPEG).
    MJPEG(Motion JPEG) で i-PRO カメラと接続します

[Details]
    Let's add face detection using OpenCV.
    Improve performace of "connect_with_mjpeg_3_1.py" by creating a face detection process.

    OpenCV を使って顔検知を追加してみます
    "connect_with_mjpeg_3_1.py" で確認したパフォーマンス問題を、顔検知処理を別プロセスにすることで改善してみます。

[Library install]
    cv2:    pip install opencv-python

[OpenCV]
    Get the file "haarcascade_frontalface_alt2.xml" from the URL below.
    下記URLからファイル "haarcascade_frontalface_alt2.xml" を入手するしてください。
    https://github.com/opencv/opencv/tree/master/data/haarcascades
'''

import cv2
import multiprocessing as mp
from queue import Empty


user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
resolution  = "1920x1080"       # Resolution
framerate   =  30               # Frame rate

# URL
url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}"

# haarcascade file for opencv cascade classification.
cascade_file = "haarcascade_frontalface_alt2.xml"       # face
#cascade_file = "haarcascade_eye.xml"                   # eye ?
#cascade_file = "haarcascade_eye_tree_eyeglasses.xml"   # eye ?
cascade = cv2.CascadeClassifier(cascade_file)


# Exception definition.
BackendError = type('BackendError', (Exception,), {})

def IsWindowVisible(winname):
    '''
    [Abstract]
        Check if the target window exists.
        対象ウィンドウが存在するかを確認する。
    [Param]
        winname :       Window title
    [Return]
        True :          exist
                        存在する
        False :         not exist
                        存在しない
    [Exception]
        BackendError :
    '''
    try:
        ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
        if ret == -1:
            raise BackendError('Use Qt as backend to check whether window is visible or not.')

        return bool(ret)

    except cv2.error:
        return False


def DetectFaces(cascade, image):
    '''
    [Abstract]
        Detect faces and return recognition result.
        顔検知して認識結果を返す
    [Param]
        cascade :       CascadeClassifier object in OpenCV format.
        image :         Image in OpenCV format.
    [Return]
        Detection result.
    '''
    # Convert to grayscale image for face detection.
    img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Detect faces from image.
    face_list = cascade.detectMultiScale(img_gray, minSize=(100, 100))

    # Return result.
    return face_list


def DetectFacesProcess(q1, q2):
    '''
    [Abstract]
        Face detection process.
    [Param]
        q1 :        [i] Queue to save the image to detect the face.
        q2 :        [o] Queue to save the result of face detection.
    [Return]
        無し
    '''
    while True:
        try:
            image = q1.get(True, 10)

            # Termination check: If type(image) is "int" and the value is -1, it ends.
            if type(image) == int:
                if image == -1:
                    break

            # Do face detection.
            face_list = DetectFaces(cascade, image)

            q2.put(face_list)
        except Empty: # timeout of q1.get()
            print("Timeout happen.(3)")

    print("Finish DetectFacesProcess()")    


def DrawFaceRectangles(image, face_list):
    '''
    [Abstract]
        Draw red frames on the image according to the detection result face_list.
        検出結果 face_list にしたがって、image 上に赤枠を描画する。
    [Param]
        image :         Image in OpenCV format.
        face_list :     List of detected face frames.
    [Return]
        None
    '''
    # Draw red frames for the number of detected faces.
    if len(face_list) != 0:
        for (pos_x, pos_y, w, h) in face_list:
            print(f"pos_x = {pos_x}, pos_y = {pos_y}, w = {w}, h = {h}")
            cv2.rectangle(image, (pos_x, pos_y), (pos_x + w, pos_y + h), (0,0,255), thickness=5)


if __name__ == '__main__':
    '''
    [Abstract]
        main function
    '''
    cap = cv2.VideoCapture(url)

    #
    windowInitialized = False

    q1 = mp.Queue()
    q2 = mp.Queue()

    p = mp.Process(target=DetectFacesProcess, args=(q1, q2))
    p.start()

    init = False

    while True:
        try:
            ret, frame = cap.read()
            if ret == True:
                # Pass the image to the face detection process.
                if (q1.qsize() <= 1) and (q2.qsize() <= 1):
                    q1.put(frame)

                # Receive results from face detection processing.
                if q2.qsize() != 0:
                    face_list = q2.get()
                    init = True

                if init == True:
                    # Draw face rectangles.
                    DrawFaceRectangles(frame, face_list)

                # Please modify the value to fit your PC screen size.
                frame2 = cv2.resize(frame, (1280, 720))
                
                # Display video.
                cv2.imshow(winname, frame2)

                if windowInitialized==False:
                    # Specify window position only once at startup.
                    cv2.moveWindow(winname, 100, 100)
                    windowInitialized = True

            # Press the "q" key to finish.
            k = cv2.waitKey(1) & 0xff   # necessary to display the video by imshow ()
            if k == ord("q"):
                break
            
            # Exit the program if there is no specified window.
            if not IsWindowVisible(winname):
                break

        except KeyboardInterrupt:
            # Press '[ctrl] + [c]' on the console to exit the program.
            print("KeyboardInterrupt")
            break

    # Terminate process p
    q1.put(-1)
    # Waiting for process p to finish
    p.join()

    print("Finish main()")
    cap.release()
    cv2.destroyAllWindows()

 

結果:

期待する動作をしてくれるようになりました。

プロセス起動の引数として cascade を一緒に渡したかったのですが、"cannot pickle object" というエラーを発生して実現できませんでした。残念ながら cascade をグローバル変数へ変更することで問題を回避しています。
対応策がわかったら記事をアップデートしたいと思います。

 

 

[動画] OpenCV で顔検知してみた様子 (30fps) (注意:ストリーム1~4 を全て Off に設定しています)

 

 

5. 連番の JPEG ファイルで保存する

受信した画像を 1 から始まる連番のファイル名 (image_NNNNNN.jpg) で JPEG ファイルとして保存してみます。

 

 

[評価環境]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

[プログラムソース "connect_with_mjpeg_4.py"]

'''
[Abstract]
    Try connecting to an i-PRO camera with MJPEG(Motion JPEG).
    MJPEG(Motion JPEG) で i-PRO カメラと接続します

[Details]
    Save the received JPEG image as a file.
    Add a 6-digit number to the end of the file name and save it as a serial number.

    受信した JPEG 画像をファイル保存します。
    ファイル名の末尾に6ケタの番号を付けて連番で保存します。

[Library install]
    cv2:    pip install opencv-python
'''

import cv2
import numpy as np
import os
import urllib.request as rq

user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
resolution  = "1920x1080"       # Resolution
framerate   =  5                # Frame rate
pathOut     = 'image'           # Image file save folder name

# URL
url = f"http://{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}"

# Exception definition
BackendError = type('BackendError', (Exception,), {})

def IsWindowVisible(winname):
    '''
    [Abstract]
        Check if the target window exists.
        対象ウィンドウが存在するかを確認する。
    [Param]
        winname :       Window title
    [Return]
        True :          exist
                        存在する
        False :         not exist
                        存在しない
    [Exception]
        BackendError :
    '''
    try:
        ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
        if ret == -1:
            raise BackendError('Use Qt as backend to check whether window is visible or not.')

        return bool(ret)

    except cv2.error:
        return False


def set_digest_auth(uri, user, passwd):
    '''
    [abstract]
        Authenticate with the IP camera.

    [params]
        uri:      CGI command for start mjpeg stream.
        user:     user-id for camera.
        passwd:   user-password for camera.
    '''
    pass_mgr = rq.HTTPPasswordMgrWithDefaultRealm()
    pass_mgr.add_password(realm=None, uri=uri, user=user, passwd=passwd)
    auth_handler = rq.HTTPDigestAuthHandler(pass_mgr)
    opener = rq.build_opener(auth_handler)
    rq.install_opener(opener)


def SaveBinaryData(data, filename):
    '''
    [Abstract]
        Save the binary data with the specified file name.
    [Param]
        data :      binary data.
        filename :  filename.
    '''
    fout = open(filename, 'wb')
    fout.write(data)
    fout.close()


if __name__ == '__main__':
    '''
    [Abstract]
        main function.
    '''
    windowInitialized = False
    count = 0
    if not os.path.exists(pathOut):
        os.mkdir(pathOut)

    set_digest_auth(url, user_id, user_pw)
    stream = rq.urlopen(url)

    bytes = bytes()
    while True:
        try:
            bytes += stream.read(1024)
            a = bytes.find(b'\xff\xd8')     # SOI (Start of Image)  0xFFD8
            b = bytes.find(b'\xff\xd9')     # EOI (End   of Image)  0xFFD9
            if a != -1 and b != -1:
                jpg = bytes[a:b+2]
                bytes = bytes[b+2:]

                # Save jpeg file.
                count += 1
                filename = os.path.join(pathOut, 'image_{:06d}.jpg'.format(count))
                SaveBinaryData(jpg, filename)

                # Convert binary data to ndarray type.
                img_buf = np.frombuffer(jpg, dtype=np.uint8)

                # Decode ndarray data to OpenCV format image data.
                frame = cv2.imdecode(img_buf, cv2.IMREAD_UNCHANGED)

                # Please modify the value to fit your PC screen size.
                frame2 = cv2.resize(frame, (1280, 720))

                # Display video.
                cv2.imshow(winname, frame2)

                if windowInitialized==False:
                    # Specify window position only once at startup.
                    cv2.moveWindow(winname, 100, 100)
                    windowInitialized = True

                # Press the "q" key to finish.
                k = cv2.waitKey(1) & 0xff   # necessary to display the video by imshow ()
                if k == ord("q"):
                    break

                # Exit the program if there is no specified window.
                if not IsWindowVisible(winname):
                    break

        except KeyboardInterrupt:
            # Press '[ctrl] + [c]' on the console to exit the program.
            print("KeyboardInterrupt")
            break
    
    cv2.destroyAllWindows()

 

 

 

6. 映像切断時の再接続処理を追加

ここまでのプログラムは、カメラとの接続が30秒以上切断すると接続が復活しませんでした。
OpenCV の read() 関数のタイムアウトは30秒となっているようです。30秒以内に接続が復活していれば自動的に再接続してくれるのですが、30秒を超えると自動的には復活しません。

"connect_with_rtsp_2.py" を元に再接続処理を追加してこの問題を解決してみたいと思います。

 

ポイント

  • read() の戻り値が False であったら再接続を行います。
  • release() で切断後、cv2.VideoCapture() で再接続します。

 

NOTE

  • 切断の試験はカメラの電源をOff/Onする、通信をOff/Onする、などで試験してください。
  • カメラとの接続が正常状態に戻ってから再接続までは最長30秒かかります。気長にお待ちください。
  • read() のタイムアウト時間を変更する API を確認できませんでした。おそらく固定値で変更できないと思われます。

 

 

[評価環境]

言語 : Python, 3.10.4
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

 

[プログラムソース "connect_with_mjpeg_5.py"]

'''
[Abstract]
    Try connecting to an i-PRO camera with RTSP.
    RTSP で i-PRO カメラと接続してみる。

[Details]
    Add reconnection when video is disconnected to "connect_with_mjpeg_2.py".
    映像切断時の再接続処理を"connect_with_mjpeg_2.py"へ追加する。

[Library install]
    cv2:    pip install opencv-python
'''

from nturl2path import url2pathname
import cv2

user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
resolution  = "1920x1080"       # Resolution
framerate   =  15               # Frame rate
url         = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}"

#
windowInitialized = False

# Exception definition.
BackendError = type('BackendError', (Exception,), {})

def IsWindowVisible(winname):
    '''
    [Abstract]
        Check if the target window exists.
        対象ウィンドウが存在するかを確認する。
    [Param]
        winname :       Window title
    [Return]
        True :          exist
                        存在する
        False :         not exist
                        存在しない
    [Exception]
        BackendError :
    '''
    try:
        ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE)
        if ret == -1:
            raise BackendError('Use Qt as backend to check whether window is visible or not.')

        return bool(ret)

    except cv2.error:
        return False


if __name__ == '__main__':
    '''
    [Abstract]
        main function.
    '''
    cap = cv2.VideoCapture(url)

    while True:
        try:
            ret, frame = cap.read()
            if ret == True:
                # Please modify the value to fit your PC screen size.
                frame2 = cv2.resize(frame, (1280, 720))
                # Display video.
                cv2.imshow(winname, frame2)

                if windowInitialized==False:
                    # Specify window position only once at startup.
                    cv2.moveWindow(winname, 100, 100)
                    windowInitialized = True
            else:
                print("cap.read() return False.")
                # The timeout period seems to be 30 seconds.
                # And there seems to be no API to change the timeout value.

                # Reconnect
                cap.release()
                cap = cv2.VideoCapture(url)

            # Press the "q" key to finish.
            k = cv2.waitKey(1) & 0xff   # necessary to display the video by imshow ()
            if k == ord("q"):
                break
            
            # Exit the program if there is no specified window.
            if not IsWindowVisible(winname):
                break
        
        except KeyboardInterrupt:
            # Press'[ctrl] + [c]' on the console to exit the program.
            print("KeyboardInterrupt")
            break

    cap.release()
    cv2.destroyAllWindows()

 

 

 

7. GUIで映像表示してみる(tkinter)

ここまでのプログラムは全てOpenCVが作成するウィンドウ表示でした。
ここでは独自の GUI を作成してここに映像表示する例を示します。

GUI 表示の実現方法もいろいろありますが、ここでは Python 標準の tkinter を使用してみます。

 

tkinter のインストール方法は環境により異なるようです。各人の環境にあった方法をインターネットで調べて実施してください。

 

ポイント

  • tkinter で動画を表示するときは、after() 関数で繰り返し処理を行います。
  • tkinter で表示するために ImageTk.PhotoImage という型に変換する必要があります。
    numpy.ndarray → PIL.Image → ImageTk.PhotoImage という順に変換して tkinter で表示します。
  • OpenCVの画像は BGR の並び順になっています。tkinker で表示するために BGR データを RGB へ並び替える必要があります。

 

RTSP で画像を取得する : 7-3. メニュー・ボタンを追加して GUI アプリらしくしてみる」で既に GUI 版を作成済みなので、プログラム "connect_with_rtsp_6_3.py" をベースに変更箇所のみをわかるように以下で記載します。ほとんど同じ内容で実現できます。

 

[評価環境]

言語 : Python, 3.10.4
  Tcl/Tk, 8.6
OS : Windows 11 home, 21H2
Windows 10 Pro, 21H1

 

[プログラムソース "connect_with_mjpeg_6.py"]

'''
[Abstract]
    Try connecting to an i-PRO camera with MJPEG.
    MJPEG で i-PRO カメラと接続してみる。

[Details]
    Display the video with GUI using tkinter.
    Add menus and buttons to make it look like a GUI app.
    
    tkinter を使ったGUIで映像を表示します。
    メニューとボタンを追加してGUIアプリらしくします。

    BGR → RGB
    numpy.ndarray → PIL.Image → ImageTk.PhotoImage
    (1) BGR → RGB
    (2) numpy.ndarray → PIL.Image
    (3) PIL.Image → ImageTk.PhotoImage

[Library install]
    cv2:    pip install opencv-python
    PIL :   pip install pillow
'''

import cv2
import time
import tkinter as tk
from tkinter import messagebox
from PIL import Image, ImageTk, ImageOps
import multiprocessing as mp


user_id     = "user-id"         # Change to match your camera setting
user_pw     = "password"        # Change to match your camera setting
host        = "192.168.0.10"    # Change to match your camera setting
winname     = "VIDEO"           # Window title
resolution  = "1920x1080"       # Resolution
framerate   =  15               # Frame rate
url         = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}"


class Application(tk.Frame):
    def __init__(self, master = None):
        super().__init__(master)
        self.pack()

        # Window settings.
        self.master.title("Display i-PRO camera with tkinter")      # Window title
        self.master.geometry("800x600+100+100")                     # Window size, position

        # Event registration for window termination.
        self.master.protocol("WM_DELETE_WINDOW", self.on_closing_window)

        # Create menu.
        menubar = tk.Menu(self.master)
        self.master.configure(menu=menubar)
        filemenu = tk.Menu(menubar)
        menubar.add_cascade(label='File', menu=filemenu)
        filemenu.add_command(label='Quit', command = self.on_closing_window)

        # Create button_frame
        self.button_frame = tk.Frame(self.master, padx=10, pady=10, relief=tk.RAISED, bd=2)
        self.button_frame.pack(side = tk.BOTTOM, fill=tk.X)

        # Create quit_button
        self.quit_button = tk.Button(self.button_frame, text='Quit', width=10, command = self.on_closing_window)
        self.quit_button.pack(side=tk.RIGHT)
        
        # Create canvas.
        self.canvas = tk.Canvas(self.master)

        # Add mouse click event to canvas.
        self.canvas.bind('<Button-1>', self.canvas_click)

        # Place canvas.
        self.canvas.pack(expand = True, fill = tk.BOTH)

        # Create image receiving process and queue
        self.imageQueue = mp.Queue()
        self.request = mp.Value('i', 0)     # -1 : Exit ReceiveImageProcess.
                                            #  0 : Normal.
                                            #  1 : Connect camera.
                                            #  2 : Release camera.
        self.p = mp.Process(target=ReceiveImageProcess, args=(self.imageQueue, self.request))
        self.p.start()

        # Raise a video display event (disp_image) after 500m
        self.disp_id = self.after(500, self.disp_image)

    def on_closing_window(self):
        ''' Window closing event. '''

        if messagebox.askokcancel("QUIT", "Do you want to quit?"):
            # Request terminate process self.p.
            self.request.value = -1

            # Waiting for process p to finish
            time.sleep(1)

            # Flash buffer.
            # The program cannot complete p.join() unless the imageQueue is emptied.
            for i in range(self.imageQueue.qsize()):
                pil_image = self.imageQueue.get()

            # Wait for process p to be terminated.
            self.p.join()
            self.master.destroy()
            print("Finish Application.")

    def canvas_click(self, event):
        ''' Event handling with mouse clicks on canvas '''

        if self.disp_id is None:
            # Connect camera.
            self.request.value = 1
            # Display image.
            self.disp_image()

        else:
            # Release camera.
            self.request.value = 2
            # Cancel scheduling
            self.after_cancel(self.disp_id)
            self.disp_id = None

    def disp_image(self):
        ''' Display image on Canvas '''

        # If there is data in the imageQueue, the program receives the data and displays the video.
        num = self.imageQueue.qsize()
        if num > 0:
            if (num > 5):
                num -= 1
            for i in range(num):
                cv_image = self.imageQueue.get()

            # (2) Convert image from ndarray to PIL.Image.
            pil_image = Image.fromarray(cv_image)

            # Get canvas size.
            canvas_width = self.canvas.winfo_width()
            canvas_height = self.canvas.winfo_height()

            # Resize the image to the size of the canvas without changing the aspect ratio.
            # アスペクトを維持したまま画像を Canvas と同じサイズにリサイズ
            pil_image = ImageOps.pad(pil_image, (canvas_width, canvas_height))

            # (3) Convert image from PIL.Image to PhotoImage
            # PIL.Image から PhotoImage へ変換する
            self.photo_image = ImageTk.PhotoImage(image=pil_image)

            # Display image on the canvas.
            self.canvas.create_image(
                canvas_width / 2,       # Image display position (center of the canvas)
                canvas_height / 2,                   
                image=self.photo_image  # image data
                )
            
        else:
            pass

        # Raise a video display event (disp_image) after 1ms.
        self.disp_id = self.after(1, self.disp_image)


def ReceiveImageProcess(imageQueue, request):
    '''
    Receive Image Process.

    Args:
        imageQueue      [o] This process stores the received image data in the imageQueue.
        request         [i] Shared memory for receiving requests from the main process.
                            -1: Terminate process.
                             0: Nothing.
                             1: Connect camera.
                             2: Release camera connection.
    Returns:
        None
    Raises
        None
    '''

    # Connect camera.
    cap = cv2.VideoCapture(url)

    while True:
        if cap != None:
            # Get frame.
            ret, frame = cap.read()

            if ret == True:
                # (1) Convert image from BGR to RGB.
                cv_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                if imageQueue.qsize() < 10:
                    imageQueue.put(cv_image)

            else:
                print("cap.read() return False.")
                # The timeout period seems to be 30 seconds.
                # And there seems to be no API to change the timeout value.
                time.sleep(1)

                # Reconnect
                cap.release()
                cap = cv2.VideoCapture(url)
        else:
            time.sleep(0.1)
                
        # Check process termination request.
        if request.value == -1:
            # Terminate process.
            cap.release()
            request.value = 0
            break

        # Check connect request.
        if request.value == 1:
            cap = cv2.VideoCapture(url)
            request.value = 0

        # Check release request.
        if request.value == 2:
            cap.release()
            cap = None
            request.value = 0

    print("Terminate SaveImageProcess().")


if __name__ == "__main__":
    root = tk.Tk()
    app = Application(master = root)
    app.mainloop()

 

[動画] tkinter で作成した GUI アプリ

 

 

 

ソースコード所在

本ページで紹介のソースコードは、下記 github より取得できます。

下記 github のソースコードと本ページの内容は差異がある場合があります。

i-pro-corp/python-examples: Examples for i-PRO cameras. (github.com)

 


ライセンス

本ページの情報は、特記無い限り下記ライセンスで提供されます。


Copyright 2022 i-PRO Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

 

 

参考

 


 

変更履歴

2023/10/20 - IP簡単設定ソフトウェア、IP Setting Software リンク先を更新, 木下英俊
2023/3/1 - 説明および表現を一部更新, 木下英俊
2022/7/20 - 微修正, 木下英俊
2022/5/26 - 新規作成, 木下英俊

 

i-PRO - Programming Items トップページ

プライバシーポリシー