Raspberry Pi 400でスイッチのプレイ動画を保存するための準備(動画編)

最終更新日:2023年3月4日

最近ハマっている(はず)のスプラ3のウデマエ上達のため、地道な反復練習も必要だが、振り返りのためのプレイ動画も必要なのでは。
また、手元で暇そうにしているラズパイを活用できないかという企画。

最終目標はラズパイで動作するHDMI入力動画の録画ソフト/スクリプトとなります。

目次

  1. 全体構想(機材)

  2. 実装方式

  3. サンプルソース(4/2 FPS補正を追加)

  4. 実験結果

1.全体構想

Raspberry Pi 400に対するHDMI入力の情報を保存するようにします。
本体にはじめからついているHDMI端子は出力用のため、別にインターフェースを用意する必要があります。
今回は予算等々の兼ね合いでUSB入力のHDMIキャプチャボードを使うこととしました。
また、プレイしながら裏で録画、を予定しているためHDMIスプリッターも利用することを考えます。

2.実装方式

最近はC++のアプリを開発する体力がないため?、実験しやすいPythonを利用することとします。

概要図

HDMI出力をスプリッターで分岐させモニタ(プレイ用)とRPi400(キャプチャ用)へ入力


使用した機材はスプリッターとUSBキャプチャボードです。


3.サンプルソース

現時点のサンプルソースを記載します。サンプルと言いつつ、他サイト様の内容をかなーり参考にさせていただいております
(引用元はソース内に記載)
ソースは無保証です。少し見るとわかりますが、加筆部分はかなり雑です

4.実験結果

#VideoRecorder.py
"""Pass input directly to output.
#
# 処理内容
#   1.入力映像をそのまま画面へリダイレクトする
#   2.入力映像をmp4ファイルへ保存する
#
# 使い方
#    args = [-vh] [-vo mp4ファイル名]
#
#    oVR = VideoRecorder(args)
#    oVR.start()
#
#    何らかの待ち処理
#
#    oVR.end()
#
# Todo
#   処理落ちによるフレーム遅延対策のテスト版
#
# 引用元 
#   https://qiita.com/kakinaguru_zo/items/eda129635816ad871e9d
#
"""

import cv2
import threading
import multiprocessing
import multiprocessing.sharedctypes
import time
import numpy
from datetime import datetime
import argparse

import numpy  # Make sure NumPy is loaded before it is used in the callback
assert numpy  # avoid "imported but unused" message (W0611)


def int_or_str(text):
    """Helper function for argument parsing."""
    try:
        return int(text)
    except ValueError:
        return text


"""
 動画保存クラス
"""
class VideoRecorder(threading.Thread):
    mArgs = ""       #引数指定用
    mLoopFlag = True #スレッド継続フラグ

    #カメラループ制御のため共有化
    mLoopFlagCamera = multiprocessing.Value('i', True) #カメラ継続フラグ

    #録画制御のため共有化
    mRecordFlag = multiprocessing.Value('i', False) #録画制御フラグ

    mWidth      = 640
    mHeight     = 480
    mFramerate  = 30
    mBuffersize = 4 #適時補正する値

    #保存サイズ(SWITCHに合わせている)
    mWidth2     = 720
    mHeight2    = 404
    #mHeight2    = 480

    mConvert=1
    mDebugPrint=0

    #コンストラクタ
    def __init__(self, args):
        #print(cv2.getBuildInformation())
        super(VideoRecorder, self).__init__()
        self.mArgs = args
        if self.mArgs.video_framerate != None:
            self.mFramerate = self.mArgs.video_framerate
        if self.mArgs.video_buffersize != None:
            self.mBuffersize = self.mArgs.video_buffersize
        if self.mArgs.video_convert != None:
            self.mConvert = self.mArgs.video_convert

    #デストラクタ
    def __del__(self):
        self.recordstop()
        self.mLoopFlag = False

    def end(self):
        self.mLoopFlagCamera.value = False
        self.mLoopFlag = False
        time.sleep(3)

    def recordstart(self):
        self.mRecordFlag.value = True

    def recordstop(self):
        self.mRecordFlag.value = False

    #別プロセスからのコールのためcv2関連オブジェクトはこの中で完結させる
    def cameracapture(self, outbuf, outbuf_ready, LoopFlagCamera, video_outputfile, RecordFlag):
        print("cameracapture start")

        try:
            #前回終了時に変な状態のことがあるので一旦開放してから再度確保させる
            if self.mConvert == 0:
                 #カメラデバイスの ID。 カメラが1台しか接続されていない場合は、0を指定します。
                capture = cv2.VideoCapture(0, cv2.CAP_V4L2)
                capture.release()
                capture = cv2.VideoCapture(0, cv2.CAP_V4L2)
            else:
                capture = cv2.VideoCapture(0)
                capture.release()
                capture = cv2.VideoCapture(0)
        except TypeError:
            capture = cv2.VideoCapture(0)

        if capture.isOpened() is False:
            raise IOError

        #無圧縮キャプチャ時のみ実施
        if self.mConvert == 0:
            if isinstance(capture.get(cv2.CAP_PROP_CONVERT_RGB), float):
                capture.set(cv2.CAP_PROP_CONVERT_RGB, 0.0)
            else:
                capture.set(cv2.CAP_PROP_CONVERT_RGB, False)

        capture.set(cv2.CAP_PROP_BUFFERSIZE, self.mBuffersize)
        if self.mConvert == 0:
            capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'YUYV'))
        else:
            #pass
            capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'mp4v'))
            #capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))

        #性能優先のため、キャプチャは低解像度
        capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.mWidth)
        capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.mHeight)
        capture.set(cv2.CAP_PROP_FPS, self.mFramerate)

        #別プロセス制御のため個別にファイル出力をする
        if video_outputfile != None:
            #mp4保存
            #date = datetime.now().strftime("_%Y%m%d_%H%M%S")
            #path = self.mArgs.video_outputfile + date + ".mp4"
            path = video_outputfile

            #出力ファイル設定
            if self.mConvert == 0:
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                #fourcc = cv2.VideoWriter_fourcc(*'H264') NG
            else:
                #fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                #fourcc = cv2.VideoWriter_fourcc(*'H264')
                fourcc = cv2.VideoWriter_fourcc(*'mjpg')
            mFout = cv2.VideoWriter(path, fourcc, self.mFramerate, (self.mWidth2, self.mHeight2))

        print("Target FPS = ", self.mFramerate)

        fps_check_start_time = time.time()
        frames = 0
        frames_offset = 0
        while(LoopFlagCamera.value):
            try:
                capture_start_time = time.time()
                ret, frame = capture.read()

                elapsed_time = time.time() - fps_check_start_time
                frames = frames + 1
                if frames_offset == -1:
                    frames_offset = int(self.mFramerate * elapsed_time - frames)

                #print("current frames = ", frames,
                # ", logical frames = ", int(self.mFramerate * elapsed_time),
                # ", frames offset = ", frames_offset,
                # ", frames diff = ", int(self.mFramerate * elapsed_time - frames))

                #フレームが指定より低い場合は処理せずループ先頭へ
                #このときフレームを1つ進める
                if frames + frames_offset < int(self.mFramerate * elapsed_time):
                    frames = frames + 1
                    print("SKIPPED")
                    continue
                else:
                    pass

                if ret is False:
                    raise IOError
                if self.mDebugPrint == 1:
                    #print("Capture FPS = ", 1.0 / (time.time() - capture_start_time))
                    pass
                if self.mConvert == 0:
                    bgr_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_YUYV)
                else:
                    bgr_frame = frame

                # for SWITCH SIZE
                bgr_frame = cv2.resize(bgr_frame, dsize=(self.mWidth2, self.mHeight2))

                # for DEBUG
                #cv2.imshow('frame2', bgr_frame)
                #cv2.waitKey(1)

                if video_outputfile != None:
                    if RecordFlag.value == True:
                        mFout.write(bgr_frame)

                outbuf_ready.clear()

                #画面表示しない場合はメモリ変換は不要
                if self.mArgs.video_hidden != True:
                    memoryview(outbuf).cast('B')[:] = memoryview(bgr_frame).cast('B')[:]

                outbuf_ready.set()
                if self.mDebugPrint == 1:
                    print("Capture+Conversion+Copy FPS = ", 1.0 / (time.time() - capture_start_time))

            except KeyboardInterrupt:
                # 終わるときは CTRL + C を押す
                break

        if video_outputfile != None:
            mFout.release()
        capture.release()
        print("cameracapture end")

    #メインスレッド
    def run(self):
        print("framerate=", self.mFramerate,
              ", width=", self.mWidth,
              ", height=", self.mHeight,
              ", Buffersize=", self.mBuffersize)

        #buf1 = multiprocessing.sharedctypes.RawArray('B', HEIGHT*WIDTH*3)
        outbuf = multiprocessing.sharedctypes.RawArray('B', self.mHeight2 * self.mWidth2 * 3)
        outbuf_ready = multiprocessing.Event()
        outbuf_ready.clear()

        p1 = multiprocessing.Process(
            target=self.cameracapture,
            args=(outbuf, outbuf_ready, self.mLoopFlagCamera, self.mArgs.video_outputfile, self.mRecordFlag),
            daemon=True)
        p1.start()
        #mCaptured_bgr_image = numpy.empty((HEIGHT, WIDTH, 3), dtype=numpy.uint8)
        self.mCaptured_bgr_image = numpy.empty((self.mHeight2, self.mWidth2, 3), dtype=numpy.uint8)

        while self.mLoopFlag:
            try:
                display_start_time = time.time()
                outbuf_ready.wait()
                self.mCaptured_bgr_image[:,:,:] = numpy.reshape(outbuf, (self.mHeight2, self.mWidth2, 3))
                outbuf_ready.clear()
                if self.mArgs.video_hidden != True:
                    cv2.imshow('frame', self.mCaptured_bgr_image)
                    cv2.waitKey(1)
                if self.mDebugPrint == 1:
                    print("Display FPS = ", 1.0 / (time.time() - display_start_time))
            except KeyboardInterrupt:
                # 終わるときは CTRL + C を押す
                print("Waiting camera reader to finish.")
                p1.join(10)
                break
        if self.mArgs.video_hidden != True:
            cv2.destroyAllWindows()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(add_help=False)
    args, remaining = parser.parse_known_args()
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter,
        parents=[parser])
    parser.add_argument('-vf', '--video-framerate', type=float, help='Video frame rate')
    parser.add_argument('-vb', '--video-buffersize', type=int, help='Video buffersize')
    parser.add_argument('-vc', '--video-convert', type=int, help='Video NON-Convert')
    parser.add_argument('-vh', '--video-hidden', action='store_true', help='hidden mode')
    parser.add_argument('-vo', '--video-outputfile', help='output Video file')
    args = parser.parse_args(remaining)

    oVR = VideoRecorder(args)
    oVR.start()

    input()
    oVR.recordstart()

    input()
    oVR.end()

キャプチャしたサンプルです。


この記事が気に入ったらサポートをしてみませんか?