本ページは i-PRO株式会社 の有志メンバーにより記載されたものです。
本ページの情報は ライセンス に記載の条件で提供されます。
本ページでは、i-PRO カメラとPCを MJPEG により接続してPC画面へ表示するプログラムを Python で作成する例を紹介します。とても短いプログラムで i-PRO カメラの映像を見ることができます。動作確認は i-PRO mini (WV-S7130)、モジュールカメラ(AIスターターキット)を使って行いましたが、ほとんどの i-PRO カメラでそのまま利用できるはずです。ぜひお試しください。
[動画] MJPEG でカメラと接続して映像表示した様子
"i-PRO mini" 紹介:
"モジュールカメラ" 紹介:
カメラ初期設定についてはカメラ毎の取扱説明書をご確認ください。
カメラのIPアドレスを確認・設定できる下記ツールを事前に入手しておくと便利です。
MJPEG で接続するための表記を以下に記載します。
「ネットワークカメラCGIコマンドインターフェース仕様書 統合版」[1] で下記に記載されている情報を元に加筆しています。
http://<user-id>:<user-password>@<カメラのIPアドレス>/nphMotionJpeg?Resolution=<解像度>&Quality=<品質>&Framerate=<フレームレート>
(具体例)
http://admin:password@192.168.0.10/nphMotionJpeg?Resolution=1920x1080&Quality=Standard&Framerate=15
注意事項:
とりあえず映像を取得してPC画面に表示するまでをやってみます。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
言語 : | Python, | 3.8.10 |
OS : | Ubuntu(WSL), | 20.04 |
まずは簡単な方法から。RTSP のコードとほとんど同じ内容で実現できました。
プログラムを終了する方法を実装していません。コンソール上で [ctrl]+[c] して終了してください。
[プログラムソース "connect_with_mjpeg_1_1.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's try first. (Method 1) まずはやってみる (方式1) [library install] cv2: pip install opencv-python ''' import cv2 user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate # URL url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" cap = cv2.VideoCapture(url) while True: try: ret, frame = cap.read() if ret == True: # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) cv2.waitKey(1) # necessary to display the video by imshow () except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cap.release() cv2.destroyAllWindows()
上記プログラムを動かしてみます。
Windows ではこんな感じで実行します。
$ python connect_with_mjpeg_1.py
Linux はこんな感じで実行します。
$ python3 connect_with_mjpeg_1.py
Windows環境で複数の Python バージョンをインストールしている場合、下図のような感じで実行バージョンを指定することもできます。
こちらはバージョン 3.10 の Python で実行する例です。
$ py -3.10 connect_with_mjpeg_1.py
上記プログラムを動かした様子を動画で示します。
こんなに簡単なプログラムでとても快適な映像表示を実現することができました。
[注意] 上記でも記載しましたが、カメラ側の設定でストリーム(1)~(4)を Off にすることで滑らかな映像表示を実現できました。On のままでもプログラム自体は動作しますが、5fps 程度の映像となりました。
[動画] MJPEG でカメラと接続して映像表示した様子 (30fps) (注意:ストリーム1~4 を全て Off に設定しています)
下記方法でも MJPEG で接続して映像表示できます。記事[2]を参考に作成してみました。
[プログラムソース "connect_with_mjpeg_1_2.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's try first. (Method 2) まずはやってみる(方式2) [library install] cv2: pip install opencv-python ''' import cv2 import numpy as np import urllib.request as rq user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate # URL url = f"http://{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" def set_digest_auth(uri, user, passwd): ''' [abstract] Authenticate with the IP camera. [params] uri: CGI command for start mjpeg stream. user: user-id for camera. passwd: user-password for camera. ''' pass_mgr = rq.HTTPPasswordMgrWithDefaultRealm() pass_mgr.add_password(realm=None, uri=uri, user=user, passwd=passwd) auth_handler = rq.HTTPDigestAuthHandler(pass_mgr) opener = rq.build_opener(auth_handler) rq.install_opener(opener) set_digest_auth(url, user_id, user_pw) stream = rq.urlopen(url) bytes = bytes() while True: try: bytes += stream.read(1024) a = bytes.find(b'\xff\xd8') # SOI (Start of Image) 0xFFD8 b = bytes.find(b'\xff\xd9') # EOI (End of Image) 0xFFD9 if a != -1 and b != -1: jpg = bytes[a:b+2] bytes = bytes[b+2:] # Convert binary data to ndarray type. img_buf = np.frombuffer(jpg, dtype=np.uint8) # Decode ndarray data to OpenCV format image data. frame = cv2.imdecode(img_buf, cv2.IMREAD_UNCHANGED) # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) cv2.waitKey(1) # necessary to display the video by imshow () except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cv2.destroyAllWindows()
前章で作成したプログラムはとても簡単に作成できましたが、いろいろと課題がありました。
とりあえず下記3つの課題を解決してみます。
課題1
プログラムを起動するたびにウィンドウ位置が変わる。場合によっては画面外へ表示する場合もあって不便。
適当に画面内に収まる場所に表示してほしい。
⇨ 指定する場所にウィンドウを表示するようにします。
課題2
プログラムを終了するのが大変。
ウィンドウ右上の[x]を押すとウィンドウがいったん消えるが、すぐに再表示されて終われない。
⇨
ウィンドウ右上の[x]ボタンでプログラムを終了できるようにします。
課題3
同様に、任意のキー入力でプログラムを終了できるとうれしい。
⇨ "q" キー押下でプログラムを終了できるようにします。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
言語 : | Python, | 3.8.10 |
OS : | Ubuntu(WSL), | 20.04 |
[プログラムソース "connect_with_mjpeg_2.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's improve the three issues of "connect_with_mjpeg_1_1.py". "connect_with_mjpeg_1_1.py" で確認した下記3つの課題を改善してみます。 [Issues 1] Specifies the position where the window is displayed. ウィンドウを指定する場所に表示するようにします。 [Issues 2] Modify the program so that you can exit the program by clicking the [x] button. ウィンドウ右上の[x]ボタンでプログラムを終了できるようにします。 [Issues 3] Modify the program so that you can exit the program by pressing the [q] key. "q" キー押下でプログラムを終了できるようにします。 [Library install] cv2: pip install opencv-python ''' import cv2 user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate # URL url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" cap = cv2.VideoCapture(url) # windowInitialized = False # Exception definition BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False while True: try: ret, frame = cap.read() if ret == True: # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cap.release() cv2.destroyAllWindows()
MJPEG の実装でも OpenCV による顔検知を実装してみます。
MJPEG 接続では映像情報は受け身です。このため高解像度、高フレームレートの映像を処理したとき、OpenCV の処理が追いつくかが心配な部分です。
下記 URL からファイル "haarcascade_frontalface_alt2.xml" を入手してプログラムと同じ場所に保存する必要があります。
https://github.com/opencv/opencv/tree/master/data/haarcascades
xml ファイル取得方法は こちら を参照ください
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
言語 : | Python, | 3.8.10 |
OS : | Ubuntu(WSL), | 20.04 |
とにかくまずはやってみます。
映像を受信するたびの OpenCV で毎回認識処理を行ってみます。
[プログラムソース "connect_with_mjpeg_3_1.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's add face detection using OpenCV. OpenCV を使って顔検知を追加してみます [Library install] cv2: pip install opencv-python [OpenCV] Get the file "haarcascade_frontalface_alt2.xml" from the URL below. 下記URLからファイル "haarcascade_frontalface_alt2.xml" を入手するしてください。 https://github.com/opencv/opencv/tree/master/data/haarcascades ''' import cv2 user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate # URL url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" # Exception definition. BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False def DetectFaces(cascade, image): ''' [Abstract] Detect faces and return recognition result. 顔検知して認識結果を返す [Param] cascade : CascadeClassifier object in OpenCV format. image : Image in OpenCV format. [Return] Detection result. ''' # Convert to grayscale image for face detection. img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Detect faces from image. face_list = cascade.detectMultiScale(img_gray, minSize=(100, 100)) # Return result. return face_list def DrawFaceRectangles(image, face_list): ''' [Abstract] Draw red frames on the image according to the detection result face_list. 検出結果 face_list にしたがって、image 上に赤枠を描画する。 [Param] image : Image in OpenCV format. face_list : List of detected face frames. [Return] None ''' # Draw red frames for the number of detected faces. if len(face_list) != 0: for (pos_x, pos_y, w, h) in face_list: print(f"pos_x = {pos_x}, pos_y = {pos_y}, w = {w}, h = {h}") cv2.rectangle(image, (pos_x, pos_y), (pos_x + w, pos_y + h), (0,0,255), thickness=5) ''' [Abstract] main 関数 ''' if __name__ == '__main__': cap = cv2.VideoCapture(url) # windowInitialized = False # haarcascade file for opencv cascade classification. cascade_file = "haarcascade_frontalface_alt2.xml" # face #cascade_file = "haarcascade_eye.xml" # eye ? #cascade_file = "haarcascade_eye_tree_eyeglasses.xml" # eye ? cascade = cv2.CascadeClassifier(cascade_file) while True: try: ret, frame = cap.read() if ret == True: # Face detection. face_list = DetectFaces(cascade, frame) # Draw face rectangles. DrawFaceRectangles(frame, face_list) # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cap.release() cv2.destroyAllWindows()
結果:
私のゲーミングPCではこれでもそこそこ動作しました。思ったより動く、という感想です。
が、それでもだんだん映像が遅れていきます。
顔検知処理と描画の部分をコメントアウトすると、映像表示の遅れはなくなります。
やはり顔検知処理は PC にとって結構重たい処理のようです。
ちょっと残念。何か改善策を考えてみたいところです。
そこで、顔検知部分を別タスクに分離することで、映像受信と映像デコード処理を止めずにできるだけ顔検知をやってみる、という感じにプログラムを修正してみます。
別タスクというと一般的なプログラムでは "スレッド" というテクニックを使いますが、どうやら CPython と呼ばれるプラットフォームの場合はスレッドは複数の処理を同時に実行してくれないらしいです。そこで、ここでは別プロセスを起動し、キューと呼ばれるIOで情報をやり取りしてみます。
[プログラムソース "connect_with_mjpeg_3_2.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's add face detection using OpenCV. Improve performace of "connect_with_mjpeg_3_1.py" by creating a face detection process. OpenCV を使って顔検知を追加してみます "connect_with_mjpeg_3_1.py" で確認したパフォーマンス問題を、顔検知処理を別プロセスにすることで改善してみます。 [Library install] cv2: pip install opencv-python [OpenCV] Get the file "haarcascade_frontalface_alt2.xml" from the URL below. 下記URLからファイル "haarcascade_frontalface_alt2.xml" を入手するしてください。 https://github.com/opencv/opencv/tree/master/data/haarcascades ''' import cv2 import multiprocessing as mp from queue import Empty user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 30 # Frame rate # URL url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" # haarcascade file for opencv cascade classification. cascade_file = "haarcascade_frontalface_alt2.xml" # face #cascade_file = "haarcascade_eye.xml" # eye ? #cascade_file = "haarcascade_eye_tree_eyeglasses.xml" # eye ? cascade = cv2.CascadeClassifier(cascade_file) # Exception definition. BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False def DetectFaces(cascade, image): ''' [Abstract] Detect faces and return recognition result. 顔検知して認識結果を返す [Param] cascade : CascadeClassifier object in OpenCV format. image : Image in OpenCV format. [Return] Detection result. ''' # Convert to grayscale image for face detection. img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Detect faces from image. face_list = cascade.detectMultiScale(img_gray, minSize=(100, 100)) # Return result. return face_list def DetectFacesProcess(q1, q2): ''' [Abstract] Face detection process. [Param] q1 : [i] Queue to save the image to detect the face. q2 : [o] Queue to save the result of face detection. [Return] 無し ''' while True: try: image = q1.get(True, 10) # Termination check: If type(image) is "int" and the value is -1, it ends. if type(image) == int: if image == -1: break # Do face detection. face_list = DetectFaces(cascade, image) q2.put(face_list) except Empty: # timeout of q1.get() print("Timeout happen.(3)") print("Finish DetectFacesProcess()") def DrawFaceRectangles(image, face_list): ''' [Abstract] Draw red frames on the image according to the detection result face_list. 検出結果 face_list にしたがって、image 上に赤枠を描画する。 [Param] image : Image in OpenCV format. face_list : List of detected face frames. [Return] None ''' # Draw red frames for the number of detected faces. if len(face_list) != 0: for (pos_x, pos_y, w, h) in face_list: print(f"pos_x = {pos_x}, pos_y = {pos_y}, w = {w}, h = {h}") cv2.rectangle(image, (pos_x, pos_y), (pos_x + w, pos_y + h), (0,0,255), thickness=5) if __name__ == '__main__': ''' [Abstract] main function ''' cap = cv2.VideoCapture(url) # windowInitialized = False q1 = mp.Queue() q2 = mp.Queue() p = mp.Process(target=DetectFacesProcess, args=(q1, q2)) p.start() init = False while True: try: ret, frame = cap.read() if ret == True: # Pass the image to the face detection process. if (q1.qsize() <= 1) and (q2.qsize() <= 1): q1.put(frame) # Receive results from face detection processing. if q2.qsize() != 0: face_list = q2.get() init = True if init == True: # Draw face rectangles. DrawFaceRectangles(frame, face_list) # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break # Terminate process p q1.put(-1) # Waiting for process p to finish p.join() print("Finish main()") cap.release() cv2.destroyAllWindows()
結果:
期待する動作をしてくれるようになりました。
プロセス起動の引数として cascade を一緒に渡したかったのですが、"cannot pickle object"
というエラーを発生して実現できませんでした。残念ながら cascade をグローバル変数へ変更することで問題を回避しています。
対応策がわかったら記事をアップデートしたいと思います。
[動画] OpenCV で顔検知してみた様子 (30fps) (注意:ストリーム1~4 を全て Off に設定しています)
受信した画像を 1 から始まる連番のファイル名 (image_NNNNNN.jpg) で JPEG ファイルとして保存してみます。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
[プログラムソース "connect_with_mjpeg_4.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Save the received JPEG image as a file. Add a 6-digit number to the end of the file name and save it as a serial number. 受信した JPEG 画像をファイル保存します。 ファイル名の末尾に6ケタの番号を付けて連番で保存します。 [Library install] cv2: pip install opencv-python ''' import cv2 import numpy as np import os import urllib.request as rq user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 5 # Frame rate pathOut = 'image' # Image file save folder name # URL url = f"http://{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" # Exception definition BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False def set_digest_auth(uri, user, passwd): ''' [abstract] Authenticate with the IP camera. [params] uri: CGI command for start mjpeg stream. user: user-id for camera. passwd: user-password for camera. ''' pass_mgr = rq.HTTPPasswordMgrWithDefaultRealm() pass_mgr.add_password(realm=None, uri=uri, user=user, passwd=passwd) auth_handler = rq.HTTPDigestAuthHandler(pass_mgr) opener = rq.build_opener(auth_handler) rq.install_opener(opener) def SaveBinaryData(data, filename): ''' [Abstract] Save the binary data with the specified file name. [Param] data : binary data. filename : filename. ''' fout = open(filename, 'wb') fout.write(data) fout.close() if __name__ == '__main__': ''' [Abstract] main function. ''' windowInitialized = False count = 0 if not os.path.exists(pathOut): os.mkdir(pathOut) set_digest_auth(url, user_id, user_pw) stream = rq.urlopen(url) bytes = bytes() while True: try: bytes += stream.read(1024) a = bytes.find(b'\xff\xd8') # SOI (Start of Image) 0xFFD8 b = bytes.find(b'\xff\xd9') # EOI (End of Image) 0xFFD9 if a != -1 and b != -1: jpg = bytes[a:b+2] bytes = bytes[b+2:] # Save jpeg file. count += 1 filename = os.path.join(pathOut, 'image_{:06d}.jpg'.format(count)) SaveBinaryData(jpg, filename) # Convert binary data to ndarray type. img_buf = np.frombuffer(jpg, dtype=np.uint8) # Decode ndarray data to OpenCV format image data. frame = cv2.imdecode(img_buf, cv2.IMREAD_UNCHANGED) # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cv2.destroyAllWindows()
ここまでのプログラムは、カメラとの接続が30秒以上切断すると接続が復活しませんでした。
OpenCV の read()
関数のタイムアウトは30秒となっているようです。30秒以内に接続が復活していれば自動的に再接続してくれるのですが、30秒を超えると自動的には復活しません。
"connect_with_rtsp_2.py" を元に再接続処理を追加してこの問題を解決してみたいと思います。
ポイント
NOTE
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
[プログラムソース "connect_with_mjpeg_5.py"]
''' [Abstract] Try connecting to an i-PRO camera with RTSP. RTSP で i-PRO カメラと接続してみる。 [Details] Add reconnection when video is disconnected to "connect_with_mjpeg_2.py". 映像切断時の再接続処理を"connect_with_mjpeg_2.py"へ追加する。 [Library install] cv2: pip install opencv-python ''' from nturl2path import url2pathname import cv2 user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" # windowInitialized = False # Exception definition. BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False if __name__ == '__main__': ''' [Abstract] main function. ''' cap = cv2.VideoCapture(url) while True: try: ret, frame = cap.read() if ret == True: # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True else: print("cap.read() return False.") # The timeout period seems to be 30 seconds. # And there seems to be no API to change the timeout value. # Reconnect cap.release() cap = cv2.VideoCapture(url) # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press'[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cap.release() cv2.destroyAllWindows()
ここまでのプログラムは全てOpenCVが作成するウィンドウ表示でした。
ここでは独自の GUI を作成してここに映像表示する例を示します。
GUI 表示の実現方法もいろいろありますが、ここでは Python 標準の tkinter を使用してみます。
tkinter のインストール方法は環境により異なるようです。各人の環境にあった方法をインターネットで調べて実施してください。
ポイント
「RTSP で画像を取得する : 7-3. メニュー・ボタンを追加して GUI アプリらしくしてみる」で既に GUI 版を作成済みなので、プログラム "connect_with_rtsp_6_3.py" をベースに変更箇所のみをわかるように以下で記載します。ほとんど同じ内容で実現できます。
言語 : | Python, | 3.10.4 |
Tcl/Tk, | 8.6 | |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
[プログラムソース "connect_with_mjpeg_6.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG. MJPEG で i-PRO カメラと接続してみる。 [Details] Display the video with GUI using tkinter. Add menus and buttons to make it look like a GUI app. tkinter を使ったGUIで映像を表示します。 メニューとボタンを追加してGUIアプリらしくします。 BGR → RGB numpy.ndarray → PIL.Image → ImageTk.PhotoImage (1) BGR → RGB (2) numpy.ndarray → PIL.Image (3) PIL.Image → ImageTk.PhotoImage [Library install] cv2: pip install opencv-python PIL : pip install pillow ''' import cv2 import time import tkinter as tk from tkinter import messagebox from PIL import Image, ImageTk, ImageOps import multiprocessing as mp user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "192.168.0.10" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" class Application(tk.Frame): def __init__(self, master = None): super().__init__(master) self.pack() # Window settings. self.master.title("Display i-PRO camera with tkinter") # Window title self.master.geometry("800x600+100+100") # Window size, position # Event registration for window termination. self.master.protocol("WM_DELETE_WINDOW", self.on_closing_window) # Create menu. menubar = tk.Menu(self.master) self.master.configure(menu=menubar) filemenu = tk.Menu(menubar) menubar.add_cascade(label='File', menu=filemenu) filemenu.add_command(label='Quit', command = self.on_closing_window) # Create button_frame self.button_frame = tk.Frame(self.master, padx=10, pady=10, relief=tk.RAISED, bd=2) self.button_frame.pack(side = tk.BOTTOM, fill=tk.X) # Create quit_button self.quit_button = tk.Button(self.button_frame, text='Quit', width=10, command = self.on_closing_window) self.quit_button.pack(side=tk.RIGHT) # Create canvas. self.canvas = tk.Canvas(self.master) # Add mouse click event to canvas. self.canvas.bind('<Button-1>', self.canvas_click) # Place canvas. self.canvas.pack(expand = True, fill = tk.BOTH) # Create image receiving process and queue self.imageQueue = mp.Queue() self.request = mp.Value('i', 0) # -1 : Exit ReceiveImageProcess. # 0 : Normal. # 1 : Connect camera. # 2 : Release camera. self.p = mp.Process(target=ReceiveImageProcess, args=(self.imageQueue, self.request)) self.p.start() # Raise a video display event (disp_image) after 500m self.disp_id = self.after(500, self.disp_image) def on_closing_window(self): ''' Window closing event. ''' if messagebox.askokcancel("QUIT", "Do you want to quit?"): # Request terminate process self.p. self.request.value = -1 # Waiting for process p to finish time.sleep(1) # Flash buffer. # The program cannot complete p.join() unless the imageQueue is emptied. for i in range(self.imageQueue.qsize()): pil_image = self.imageQueue.get() # Wait for process p to be terminated. self.p.join() self.master.destroy() print("Finish Application.") def canvas_click(self, event): ''' Event handling with mouse clicks on canvas ''' if self.disp_id is None: # Connect camera. self.request.value = 1 # Display image. self.disp_image() else: # Release camera. self.request.value = 2 # Cancel scheduling self.after_cancel(self.disp_id) self.disp_id = None def disp_image(self): ''' Display image on Canvas ''' # If there is data in the imageQueue, the program receives the data and displays the video. num = self.imageQueue.qsize() if num > 0: if (num > 5): num -= 1 for i in range(num): cv_image = self.imageQueue.get() # (2) Convert image from ndarray to PIL.Image. pil_image = Image.fromarray(cv_image) # Get canvas size. canvas_width = self.canvas.winfo_width() canvas_height = self.canvas.winfo_height() # Resize the image to the size of the canvas without changing the aspect ratio. # アスペクトを維持したまま画像を Canvas と同じサイズにリサイズ pil_image = ImageOps.pad(pil_image, (canvas_width, canvas_height)) # (3) Convert image from PIL.Image to PhotoImage # PIL.Image から PhotoImage へ変換する self.photo_image = ImageTk.PhotoImage(image=pil_image) # Display image on the canvas. self.canvas.create_image( canvas_width / 2, # Image display position (center of the canvas) canvas_height / 2, image=self.photo_image # image data ) else: pass # Raise a video display event (disp_image) after 1ms. self.disp_id = self.after(1, self.disp_image) def ReceiveImageProcess(imageQueue, request): ''' Receive Image Process. Args: imageQueue [o] This process stores the received image data in the imageQueue. request [i] Shared memory for receiving requests from the main process. -1: Terminate process. 0: Nothing. 1: Connect camera. 2: Release camera connection. Returns: None Raises None ''' # Connect camera. cap = cv2.VideoCapture(url) while True: if cap != None: # Get frame. ret, frame = cap.read() if ret == True: # (1) Convert image from BGR to RGB. cv_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if imageQueue.qsize() < 10: imageQueue.put(cv_image) else: print("cap.read() return False.") # The timeout period seems to be 30 seconds. # And there seems to be no API to change the timeout value. time.sleep(1) # Reconnect cap.release() cap = cv2.VideoCapture(url) else: time.sleep(0.1) # Check process termination request. if request.value == -1: # Terminate process. cap.release() request.value = 0 break # Check connect request. if request.value == 1: cap = cv2.VideoCapture(url) request.value = 0 # Check release request. if request.value == 2: cap.release() cap = None request.value = 0 print("Terminate SaveImageProcess().") if __name__ == "__main__": root = tk.Tk() app = Application(master = root) app.mainloop()
[動画] tkinter で作成した GUI アプリ
本ページで紹介のソースコードは、下記 github より取得できます。
下記 github のソースコードと本ページの内容は差異がある場合があります。
i-pro-corp/python-examples: Examples for i-PRO cameras. (github.com)
本ページの情報は、特記無い限り下記ライセンスで提供されます。
2023/10/20 | - | IP簡単設定ソフトウェア、IP Setting Software リンク先を更新, | 木下英俊 |
2023/3/1 | - | 説明および表現を一部更新, | 木下英俊 |
2022/7/20 | - | 微修正, | 木下英俊 |
2022/5/26 | - | 新規作成, | 木下英俊 |
i-PRO - Programming Items トップページ