本ページでは、i-PRO カメラとPCを MJPEG により接続してPC画面へ表示するプログラムを Python で作成する例を紹介します。とても短いプログラムで i-PRO カメラの映像を見ることができます。動作確認は i-PRO mini (WV-S7130)、モジュールカメラ(AIスターターキット)を使って行いましたが、ほとんどの i-PRO カメラでそのまま利用できるはずです。ぜひお試しください。
[動画] MJPEG でカメラと接続して映像表示した様子
"i-PRO mini" 紹介:
"モジュールカメラ" 紹介:
MJPEG で接続するための表記を以下に記載します。
「ネットワークカメラCGIコマンドインターフェース仕様書 統合版」[1] で下記に記載されている情報を元に加筆しています。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
言語 : | Python, | 3.8.10 |
OS : | Ubuntu(WSL), | 20.04 |
まずは簡単な方法から。RTSP のコードとほとんど同じ内容で実現できました。
プログラムを終了する方法を実装していません。コンソール上で [ctrl]+[c] して終了してください。
[プログラムソース "connect_with_mjpeg_1_1.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's try first. (Method 1) まずはやってみる (方式1) [library install] cv2: pip install opencv-python ''' import cv2 user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate # URL url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" cap = cv2.VideoCapture(url) while True: try: ret, frame = cap.read() if ret == True: # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) cv2.waitKey(1) # necessary to display the video by imshow () except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cap.release() cv2.destroyAllWindows()
Windows ではこんな感じで実行します。
$ python connect_with_mjpeg_1.py
Linux はこんな感じで実行します。
$ python3 connect_with_mjpeg_1.py
Windows環境で複数の Python バージョンをインストールしている場合、下図のような感じで実行バージョンを指定することもできます。
こちらはバージョン 3.10 の Python で実行する例です。
$ py -3.10 connect_with_mjpeg_1.py
[注意] 上記でも記載しましたが、カメラ側の設定でストリーム(1)~(4)を Off にすることで滑らかな映像表示を実現できました。On のままでもプログラム自体は動作しますが、5fps 程度の映像となりました。
[動画] MJPEG でカメラと接続して映像表示した様子 (30fps) (注意:ストリーム1~4 を全て Off に設定しています)
下記方法でも MJPEG で接続して映像表示できます。記事[2]を参考に作成してみました。
[プログラムソース "connect_with_mjpeg_1_2.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's try first. (Method 2) まずはやってみる(方式2) [library install] cv2: pip install opencv-python ''' import cv2 import numpy as np import urllib.request as rq user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate # URL url = f"http://{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" def set_digest_auth(uri, user, passwd): ''' [abstract] Authenticate with the IP camera. [params] uri: CGI command for start mjpeg stream. user: user-id for camera. passwd: user-password for camera. ''' pass_mgr = rq.HTTPPasswordMgrWithDefaultRealm() pass_mgr.add_password(realm=None, uri=uri, user=user, passwd=passwd) auth_handler = rq.HTTPDigestAuthHandler(pass_mgr) opener = rq.build_opener(auth_handler) rq.install_opener(opener) set_digest_auth(url, user_id, user_pw) stream = rq.urlopen(url) bytes = bytes() while True: try: bytes += stream.read(1024) a = bytes.find(b'\xff\xd8') # SOI (Start of Image) 0xFFD8 b = bytes.find(b'\xff\xd9') # EOI (End of Image) 0xFFD9 if a != -1 and b != -1: jpg = bytes[a:b+2] bytes = bytes[b+2:] # Convert binary data to ndarray type. img_buf = np.frombuffer(jpg, dtype=np.uint8) # Decode ndarray data to OpenCV format image data. frame = cv2.imdecode(img_buf, cv2.IMREAD_UNCHANGED) # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) cv2.waitKey(1) # necessary to display the video by imshow () except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cv2.destroyAllWindows()
⇨ 指定する場所にウィンドウを表示するようにします。
⇨ "q" キー押下でプログラムを終了できるようにします。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
言語 : | Python, | 3.8.10 |
OS : | Ubuntu(WSL), | 20.04 |
[プログラムソース "connect_with_mjpeg_2.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's improve the three issues of "connect_with_mjpeg_1_1.py". "connect_with_mjpeg_1_1.py" で確認した下記3つの課題を改善してみます。 [Issues 1] Specifies the position where the window is displayed. ウィンドウを指定する場所に表示するようにします。 [Issues 2] Modify the program so that you can exit the program by clicking the [x] button. ウィンドウ右上の[x]ボタンでプログラムを終了できるようにします。 [Issues 3] Modify the program so that you can exit the program by pressing the [q] key. "q" キー押下でプログラムを終了できるようにします。 [Library install] cv2: pip install opencv-python ''' import cv2 user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate # URL url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" cap = cv2.VideoCapture(url) # windowInitialized = False # Exception definition BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False while True: try: ret, frame = cap.read() if ret == True: # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cap.release() cv2.destroyAllWindows()
MJPEG の実装でも OpenCV による顔検知を実装してみます。
MJPEG 接続では映像情報は受け身です。このため高解像度、高フレームレートの映像を処理したとき、OpenCV の処理が追いつくかが心配な部分です。
下記 URL からファイル "haarcascade_frontalface_alt2.xml" を入手してプログラムと同じ場所に保存する必要があります。
xml ファイル取得方法は こちら を参照ください
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
言語 : | Python, | 3.8.10 |
OS : | Ubuntu(WSL), | 20.04 |
映像を受信するたびの OpenCV で毎回認識処理を行ってみます。
[プログラムソース "connect_with_mjpeg_3_1.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's add face detection using OpenCV. OpenCV を使って顔検知を追加してみます [Library install] cv2: pip install opencv-python [OpenCV] Get the file "haarcascade_frontalface_alt2.xml" from the URL below. 下記URLからファイル "haarcascade_frontalface_alt2.xml" を入手するしてください。 https://github.com/opencv/opencv/tree/master/data/haarcascades ''' import cv2 user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate # URL url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" # Exception definition. BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False def DetectFaces(cascade, image): ''' [Abstract] Detect faces and return recognition result. 顔検知して認識結果を返す [Param] cascade : CascadeClassifier object in OpenCV format. image : Image in OpenCV format. [Return] Detection result. ''' # Convert to grayscale image for face detection. img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Detect faces from image. face_list = cascade.detectMultiScale(img_gray, minSize=(100, 100)) # Return result. return face_list def DrawFaceRectangles(image, face_list): ''' [Abstract] Draw red frames on the image according to the detection result face_list. 検出結果 face_list にしたがって、image 上に赤枠を描画する。 [Param] image : Image in OpenCV format. face_list : List of detected face frames. [Return] None ''' # Draw red frames for the number of detected faces. if len(face_list) != 0: for (pos_x, pos_y, w, h) in face_list: print(f"pos_x = {pos_x}, pos_y = {pos_y}, w = {w}, h = {h}") cv2.rectangle(image, (pos_x, pos_y), (pos_x + w, pos_y + h), (0,0,255), thickness=5) ''' [Abstract] main 関数 ''' if __name__ == '__main__': cap = cv2.VideoCapture(url) # windowInitialized = False # haarcascade file for opencv cascade classification. cascade_file = "haarcascade_frontalface_alt2.xml" # face #cascade_file = "haarcascade_eye.xml" # eye ? #cascade_file = "haarcascade_eye_tree_eyeglasses.xml" # eye ? cascade = cv2.CascadeClassifier(cascade_file) while True: try: ret, frame = cap.read() if ret == True: # Face detection. face_list = DetectFaces(cascade, frame) # Draw face rectangles. DrawFaceRectangles(frame, face_list) # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cap.release() cv2.destroyAllWindows()
やはり顔検知処理は PC にとって結構重たい処理のようです。
別タスクというと一般的なプログラムでは "スレッド" というテクニックを使いますが、どうやら CPython と呼ばれるプラットフォームの場合はスレッドは複数の処理を同時に実行してくれないらしいです。そこで、ここでは別プロセスを起動し、キューと呼ばれるIOで情報をやり取りしてみます。
[プログラムソース "connect_with_mjpeg_3_2.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Let's add face detection using OpenCV. Improve performace of "connect_with_mjpeg_3_1.py" by creating a face detection process. OpenCV を使って顔検知を追加してみます "connect_with_mjpeg_3_1.py" で確認したパフォーマンス問題を、顔検知処理を別プロセスにすることで改善してみます。 [Library install] cv2: pip install opencv-python [OpenCV] Get the file "haarcascade_frontalface_alt2.xml" from the URL below. 下記URLからファイル "haarcascade_frontalface_alt2.xml" を入手するしてください。 https://github.com/opencv/opencv/tree/master/data/haarcascades ''' import cv2 import multiprocessing as mp from queue import Empty user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 30 # Frame rate # URL url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" # haarcascade file for opencv cascade classification. cascade_file = "haarcascade_frontalface_alt2.xml" # face #cascade_file = "haarcascade_eye.xml" # eye ? #cascade_file = "haarcascade_eye_tree_eyeglasses.xml" # eye ? cascade = cv2.CascadeClassifier(cascade_file) # Exception definition. BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False def DetectFaces(cascade, image): ''' [Abstract] Detect faces and return recognition result. 顔検知して認識結果を返す [Param] cascade : CascadeClassifier object in OpenCV format. image : Image in OpenCV format. [Return] Detection result. ''' # Convert to grayscale image for face detection. img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Detect faces from image. face_list = cascade.detectMultiScale(img_gray, minSize=(100, 100)) # Return result. return face_list def DetectFacesProcess(q1, q2): ''' [Abstract] Face detection process. [Param] q1 : [i] Queue to save the image to detect the face. q2 : [o] Queue to save the result of face detection. [Return] 無し ''' while True: try: image = q1.get(True, 10) # Termination check: If type(image) is "int" and the value is -1, it ends. if type(image) == int: if image == -1: break # Do face detection. face_list = DetectFaces(cascade, image) q2.put(face_list) except Empty: # timeout of q1.get() print("Timeout happen.(3)") print("Finish DetectFacesProcess()") def DrawFaceRectangles(image, face_list): ''' [Abstract] Draw red frames on the image according to the detection result face_list. 検出結果 face_list にしたがって、image 上に赤枠を描画する。 [Param] image : Image in OpenCV format. face_list : List of detected face frames. [Return] None ''' # Draw red frames for the number of detected faces. if len(face_list) != 0: for (pos_x, pos_y, w, h) in face_list: print(f"pos_x = {pos_x}, pos_y = {pos_y}, w = {w}, h = {h}") cv2.rectangle(image, (pos_x, pos_y), (pos_x + w, pos_y + h), (0,0,255), thickness=5) if __name__ == '__main__': ''' [Abstract] main function ''' cap = cv2.VideoCapture(url) # windowInitialized = False q1 = mp.Queue() q2 = mp.Queue() p = mp.Process(target=DetectFacesProcess, args=(q1, q2)) p.start() init = False while True: try: ret, frame = cap.read() if ret == True: # Pass the image to the face detection process. if (q1.qsize() <= 1) and (q2.qsize() <= 1): q1.put(frame) # Receive results from face detection processing. if q2.qsize() != 0: face_list = q2.get() init = True if init == True: # Draw face rectangles. DrawFaceRectangles(frame, face_list) # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break # Terminate process p q1.put(-1) # Waiting for process p to finish p.join() print("Finish main()") cap.release() cv2.destroyAllWindows()
プロセス起動の引数として cascade を一緒に渡したかったのですが、"cannot pickle object"
というエラーを発生して実現できませんでした。残念ながら cascade をグローバル変数へ変更することで問題を回避しています。
[動画] OpenCV で顔検知してみた様子 (30fps) (注意:ストリーム1~4 を全て Off に設定しています)
受信した画像を 1 から始まる連番のファイル名 (image_NNNNNN.jpg) で JPEG ファイルとして保存してみます。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
[プログラムソース "connect_with_mjpeg_4.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG(Motion JPEG). MJPEG(Motion JPEG) で i-PRO カメラと接続します [Details] Save the received JPEG image as a file. Add a 6-digit number to the end of the file name and save it as a serial number. 受信した JPEG 画像をファイル保存します。 ファイル名の末尾に6ケタの番号を付けて連番で保存します。 [Library install] cv2: pip install opencv-python ''' import cv2 import numpy as np import os import urllib.request as rq user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 5 # Frame rate pathOut = 'image' # Image file save folder name # URL url = f"http://{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" # Exception definition BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False def set_digest_auth(uri, user, passwd): ''' [abstract] Authenticate with the IP camera. [params] uri: CGI command for start mjpeg stream. user: user-id for camera. passwd: user-password for camera. ''' pass_mgr = rq.HTTPPasswordMgrWithDefaultRealm() pass_mgr.add_password(realm=None, uri=uri, user=user, passwd=passwd) auth_handler = rq.HTTPDigestAuthHandler(pass_mgr) opener = rq.build_opener(auth_handler) rq.install_opener(opener) def SaveBinaryData(data, filename): ''' [Abstract] Save the binary data with the specified file name. [Param] data : binary data. filename : filename. ''' fout = open(filename, 'wb') fout.write(data) fout.close() if __name__ == '__main__': ''' [Abstract] main function. ''' windowInitialized = False count = 0 if not os.path.exists(pathOut): os.mkdir(pathOut) set_digest_auth(url, user_id, user_pw) stream = rq.urlopen(url) bytes = bytes() while True: try: bytes += stream.read(1024) a = bytes.find(b'\xff\xd8') # SOI (Start of Image) 0xFFD8 b = bytes.find(b'\xff\xd9') # EOI (End of Image) 0xFFD9 if a != -1 and b != -1: jpg = bytes[a:b+2] bytes = bytes[b+2:] # Save jpeg file. count += 1 filename = os.path.join(pathOut, 'image_{:06d}.jpg'.format(count)) SaveBinaryData(jpg, filename) # Convert binary data to ndarray type. img_buf = np.frombuffer(jpg, dtype=np.uint8) # Decode ndarray data to OpenCV format image data. frame = cv2.imdecode(img_buf, cv2.IMREAD_UNCHANGED) # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press '[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cv2.destroyAllWindows()
OpenCV の read()
"connect_with_rtsp_2.py" を元に再接続処理を追加してこの問題を解決してみたいと思います。
言語 : | Python, | 3.10.4 |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
[プログラムソース "connect_with_mjpeg_5.py"]
''' [Abstract] Try connecting to an i-PRO camera with RTSP. RTSP で i-PRO カメラと接続してみる。 [Details] Add reconnection when video is disconnected to "connect_with_mjpeg_2.py". 映像切断時の再接続処理を"connect_with_mjpeg_2.py"へ追加する。 [Library install] cv2: pip install opencv-python ''' from nturl2path import url2pathname import cv2 user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" # windowInitialized = False # Exception definition. BackendError = type('BackendError', (Exception,), {}) def IsWindowVisible(winname): ''' [Abstract] Check if the target window exists. 対象ウィンドウが存在するかを確認する。 [Param] winname : Window title [Return] True : exist 存在する False : not exist 存在しない [Exception] BackendError : ''' try: ret = cv2.getWindowProperty(winname, cv2.WND_PROP_VISIBLE) if ret == -1: raise BackendError('Use Qt as backend to check whether window is visible or not.') return bool(ret) except cv2.error: return False if __name__ == '__main__': ''' [Abstract] main function. ''' cap = cv2.VideoCapture(url) while True: try: ret, frame = cap.read() if ret == True: # Please modify the value to fit your PC screen size. frame2 = cv2.resize(frame, (1280, 720)) # Display video. cv2.imshow(winname, frame2) if windowInitialized==False: # Specify window position only once at startup. cv2.moveWindow(winname, 100, 100) windowInitialized = True else: print("cap.read() return False.") # The timeout period seems to be 30 seconds. # And there seems to be no API to change the timeout value. # Reconnect cap.release() cap = cv2.VideoCapture(url) # Press the "q" key to finish. k = cv2.waitKey(1) & 0xff # necessary to display the video by imshow () if k == ord("q"): break # Exit the program if there is no specified window. if not IsWindowVisible(winname): break except KeyboardInterrupt: # Press'[ctrl] + [c]' on the console to exit the program. print("KeyboardInterrupt") break cap.release() cv2.destroyAllWindows()
ここでは独自の GUI を作成してここに映像表示する例を示します。
GUI 表示の実現方法もいろいろありますが、ここでは Python 標準の tkinter を使用してみます。
tkinter のインストール方法は環境により異なるようです。各人の環境にあった方法をインターネットで調べて実施してください。
「RTSP で画像を取得する : 7-3. メニュー・ボタンを追加して GUI アプリらしくしてみる」で既に GUI 版を作成済みなので、プログラム "connect_with_rtsp_6_3.py" をベースに変更箇所のみをわかるように以下で記載します。ほとんど同じ内容で実現できます。
言語 : | Python, | 3.10.4 |
Tcl/Tk, | 8.6 | |
OS : | Windows 11 home, | 21H2 |
Windows 10 Pro, | 21H1 | |
[プログラムソース "connect_with_mjpeg_6.py"]
''' [Abstract] Try connecting to an i-PRO camera with MJPEG. MJPEG で i-PRO カメラと接続してみる。 [Details] Display the video with GUI using tkinter. Add menus and buttons to make it look like a GUI app. tkinter を使ったGUIで映像を表示します。 メニューとボタンを追加してGUIアプリらしくします。 BGR → RGB numpy.ndarray → PIL.Image → ImageTk.PhotoImage (1) BGR → RGB (2) numpy.ndarray → PIL.Image (3) PIL.Image → ImageTk.PhotoImage [Library install] cv2: pip install opencv-python PIL : pip install pillow ''' import cv2 import time import tkinter as tk from tkinter import messagebox from PIL import Image, ImageTk, ImageOps import multiprocessing as mp user_id = "user-id" # Change to match your camera setting user_pw = "password" # Change to match your camera setting host = "" # Change to match your camera setting winname = "VIDEO" # Window title resolution = "1920x1080" # Resolution framerate = 15 # Frame rate url = f"http://{user_id}:{user_pw}@{host}/cgi-bin/nphMotionJpeg?Resolution={resolution}&Quality=Standard&Framerate={framerate}" class Application(tk.Frame): def __init__(self, master = None): super().__init__(master) self.pack() # Window settings. self.master.title("Display i-PRO camera with tkinter") # Window title self.master.geometry("800x600+100+100") # Window size, position # Event registration for window termination. self.master.protocol("WM_DELETE_WINDOW", self.on_closing_window) # Create menu. menubar = tk.Menu(self.master) self.master.configure(menu=menubar) filemenu = tk.Menu(menubar) menubar.add_cascade(label='File', menu=filemenu) filemenu.add_command(label='Quit', command = self.on_closing_window) # Create button_frame self.button_frame = tk.Frame(self.master, padx=10, pady=10, relief=tk.RAISED, bd=2) self.button_frame.pack(side = tk.BOTTOM, fill=tk.X) # Create quit_button self.quit_button = tk.Button(self.button_frame, text='Quit', width=10, command = self.on_closing_window) self.quit_button.pack(side=tk.RIGHT) # Create canvas. self.canvas = tk.Canvas(self.master) # Add mouse click event to canvas. self.canvas.bind('<Button-1>', self.canvas_click) # Place canvas. self.canvas.pack(expand = True, fill = tk.BOTH) # Create image receiving process and queue self.imageQueue = mp.Queue() self.request = mp.Value('i', 0) # -1 : Exit ReceiveImageProcess. # 0 : Normal. # 1 : Connect camera. # 2 : Release camera. self.p = mp.Process(target=ReceiveImageProcess, args=(self.imageQueue, self.request)) self.p.start() # Raise a video display event (disp_image) after 500m self.disp_id = self.after(500, self.disp_image) def on_closing_window(self): ''' Window closing event. ''' if messagebox.askokcancel("QUIT", "Do you want to quit?"): # Request terminate process self.p. self.request.value = -1 # Waiting for process p to finish time.sleep(1) # Flash buffer. # The program cannot complete p.join() unless the imageQueue is emptied. for i in range(self.imageQueue.qsize()): pil_image = self.imageQueue.get() # Wait for process p to be terminated. self.p.join() self.master.destroy() print("Finish Application.") def canvas_click(self, event): ''' Event handling with mouse clicks on canvas ''' if self.disp_id is None: # Connect camera. self.request.value = 1 # Display image. self.disp_image() else: # Release camera. self.request.value = 2 # Cancel scheduling self.after_cancel(self.disp_id) self.disp_id = None def disp_image(self): ''' Display image on Canvas ''' # If there is data in the imageQueue, the program receives the data and displays the video. num = self.imageQueue.qsize() if num > 0: if (num > 5): num -= 1 for i in range(num): cv_image = self.imageQueue.get() # (2) Convert image from ndarray to PIL.Image. pil_image = Image.fromarray(cv_image) # Get canvas size. canvas_width = self.canvas.winfo_width() canvas_height = self.canvas.winfo_height() # Resize the image to the size of the canvas without changing the aspect ratio. # アスペクトを維持したまま画像を Canvas と同じサイズにリサイズ pil_image = ImageOps.pad(pil_image, (canvas_width, canvas_height)) # (3) Convert image from PIL.Image to PhotoImage # PIL.Image から PhotoImage へ変換する self.photo_image = ImageTk.PhotoImage(image=pil_image) # Display image on the canvas. self.canvas.create_image( canvas_width / 2, # Image display position (center of the canvas) canvas_height / 2, image=self.photo_image # image data ) else: pass # Raise a video display event (disp_image) after 1ms. self.disp_id = self.after(1, self.disp_image) def ReceiveImageProcess(imageQueue, request): ''' Receive Image Process. Args: imageQueue [o] This process stores the received image data in the imageQueue. request [i] Shared memory for receiving requests from the main process. -1: Terminate process. 0: Nothing. 1: Connect camera. 2: Release camera connection. Returns: None Raises None ''' # Connect camera. cap = cv2.VideoCapture(url) while True: if cap != None: # Get frame. ret, frame = cap.read() if ret == True: # (1) Convert image from BGR to RGB. cv_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if imageQueue.qsize() < 10: imageQueue.put(cv_image) else: print("cap.read() return False.") # The timeout period seems to be 30 seconds. # And there seems to be no API to change the timeout value. time.sleep(1) # Reconnect cap.release() cap = cv2.VideoCapture(url) else: time.sleep(0.1) # Check process termination request. if request.value == -1: # Terminate process. cap.release() request.value = 0 break # Check connect request. if request.value == 1: cap = cv2.VideoCapture(url) request.value = 0 # Check release request. if request.value == 2: cap.release() cap = None request.value = 0 print("Terminate SaveImageProcess().") if __name__ == "__main__": root = tk.Tk() app = Application(master = root) app.mainloop()
[動画] tkinter で作成した GUI アプリ
本ページで紹介のソースコードは、下記 github より取得できます。
下記 github のソースコードと本ページの内容は差異がある場合があります。
i-pro-corp/python-examples: Examples for i-PRO cameras. (github.com)
