Raspberry Pi 400でスイッチのプレイ動画を保存するための準備(音声編)

最終更新日:2023年2月13日

最近ハマっている(はず)のスプラ3のウデマエ上達のため、地道な反復練習も必要だが、振り返りのためのプレイ動画も必要なのでは。
また、手元で暇そうにしているラズパイを活用できないか、あと動画だけだと味気ないので音声も付けたいという企画。

最終目標はラズパイで動作するHDMI入力動画の録画ソフト/スクリプトとなります。

目次

  1. 全体構想(機材)

  2. 実装方式

  3. サンプルソース

  4. 実験結果

1.全体構想

Raspberry Pi 400に対するHDMI入力の情報を保存するようにします。
本体にはじめからついているHDMI端子は出力用のため、別にインターフェースを用意する必要があります。
今回は予算等々の兼ね合いでUSB入力のHDMIキャプチャボードを使うこととしました。

2.実装方式

最近はC++のアプリを開発する体力がないため?、実験しやすいPythonを利用することとします。

構成・機材はHDMIスプリッターとUSBキャプチャボードで、動画編と同じです。

3.サンプルソース

現時点のサンプルソースを記載します。サンプルと言いつつ、他サイト様の内容をかなーり参考にさせていただいております
(引用元はソース内に記載)
ソースは無保証です。少し見るとわかりますが、加筆部分はかなり雑です
あと、まともに動作しないこともあります

#SoundRecorder.py
"""Pass input directly to output.
#
# 処理内容
#   1.入力音源をそのまま出力音源へリダイレクトする
#   2.入力音源をWAVファイルへ保存する
#
# 使い方
#    args = [-ws] [-wv] [-wo Wavファイル名]
#
#    oSR = SoundRecorder(args)
#    oSR.start()
#
#    何らかの待ち処理
#
#    oSR.end()
#
# 引用元 
#   https://python-sounddevice.readthedocs.io/en/0.4.5/examples.html#input-to-output-pass-through
#
"""
import argparse
import time
import sounddevice as sd
import threading
import numpy as np # Make sure NumPy is loaded before it is used in the callback
#assert numpy # avoid "imported but unused" message (W0611)

import soundfile as sf
import resampy
import sys
import wave


def int_or_str(text):
    """Helper function for argument parsing."""
    try:
        return int(text)
    except ValueError:
        return text


#sd.default.device = [1, 10] # Input, Outputデバイス指定

class SoundRecorder(threading.Thread):
    mArgs = ""       #引数指定用
    mLoopFlag = True #スレッド継続フラグ
    mVolumeratio = 1.0
    mRecordFlag = False  #録音制御フラグ 
    #コンストラクタ
    def __init__(self, args):
        super(SoundRecorder, self).__init__()
        self.mArgs = args
        if self.mArgs.wave_volume != None:
            self.mVolumeratio = self.mArgs.wave_volume

    #デストラクタ
    def __del__(self):
        self.recordstop()
        self.mLoopFlag = False

    #外部からの処理終了指示
    def end(self):
        self.mLoopFlag = False

    def callback(self, indata, outdata, frames, time, status):
        if status:
            print(status)
        if self.mArgs.wave_silentmode != 1:
            outdata[:] = self.filter(indata)
        t1 = threading.Thread(target=self.waveout, args=(indata,))
        t1.start()
        t1.join()

    #音声加工処理
    def filter(self, indata):
        outdata = indata
        return outdata

    def recordstart(self):
        self.mRecordFlag = True

    def recordstop(self):
        self.mRecordFlag = False

    def waveopen(self):
        if self.mArgs.wave_outputfile != None:
            self.wb = wave.open(self.mArgs.wave_outputfile, mode='wb')
            self.wb.setnchannels(2) #初期値は2にしておく
            if self.mArgs.channels != None:
                self.wb.setnchannels(self.mArgs.channels)
            self.wb.setsampwidth(2)  # 16bit=2byte
            self.wb.setframerate(44100) #初期値は44100にしておく
            if self.mArgs.wave_samplerate != None:
                self.wb.setframerate(self.mArgs.wave_samplerate)

    def waveout(self, buf):
        if self.mArgs.wave_outputfile == None:
            return

        if self.mRecordFlag == True:
            # リサンプリング
            #soundbuf = resampy.resample(buf, self.mArgs.wave_samplerate, self.mArgs.wave_samplerate)
            soundbuf = buf

            # 音量正規化
            # これにより微小なノイズが猛烈に拡大されるときがある
            #if  soundbuf.max() != 0:
            #    soundbuf = soundbuf / soundbuf.max() * np.iinfo(np.int16).max

            # 音量補正
            soundbuf = soundbuf * (np.iinfo(np.int16).max - 1) * self.mVolumeratio

            #print("max=", soundbuf.max())
            # float -> int
            soundbuf = soundbuf.astype(np.int16)
            self.wb.writeframes(soundbuf.tobytes())

    def waveclose(self):
        if self.mArgs.wave_outputfile != None:
            self.wb.close()

    #メインスレッド
    def run(self):
        self.event = threading.Event()

        print(sd.query_devices())
        print("default.device=",       sd.default.device,
              ", default.dtype=",      sd.default.dtype,
              ", default.channels=",   sd.default.channels,
              ", default.samplerate=", sd.default.samplerate)
        print("inputdev=", self.mArgs.input_device,
              ", outputdev=", self.mArgs.output_device,
              ", channels=", self.mArgs.channels,
              ", wavesamplerate=", self.mArgs.wave_samplerate,
              ", wavevolumeratio=", self.mVolumeratio,
              ", waveoutputfile=", self.mArgs.wave_outputfile)

        self.waveopen()
        try:
            with sd.Stream(device=(self.mArgs.input_device, self.mArgs.output_device),
                   samplerate=self.mArgs.wave_samplerate, blocksize=self.mArgs.wave_blocksize,
                   dtype=self.mArgs.wave_dtype, latency=self.mArgs.wave_latency,
                   channels=self.mArgs.channels, callback=self.callback,
                   finished_callback=self.event.set):                 
                print('#' * 80)
                while self.mLoopFlag:
                    time.sleep(0.01)
        except KeyboardInterrupt:
            parser.exit('')
        except Exception as e:
            parser.exit(type(e).__name__ + ': ' + str(e))
        self.waveclose()
        print("Thread End")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(add_help=False)
    parser.add_argument(
        '-l', '--list-devices', action='store_true',
        help='show list of audio devices and exit')
    args, remaining = parser.parse_known_args()
    if args.list_devices:
        print(sd.query_devices())
        parser.exit(0)
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter,
        parents=[parser])
    parser.add_argument('-i', '--input-device', type=int_or_str, help='input device (numeric ID or substring)')
    parser.add_argument('-o', '--output-device', type=int_or_str, help='output device (numeric ID or substring)')
    parser.add_argument('-c', '--channels', type=int, default=2, help='number of channels')
    parser.add_argument('-wd', '--wave-dtype', help='audio data type')
    parser.add_argument('-wr', '--wave-samplerate', type=float, help='sampling rate')
    parser.add_argument('-wb', '--wave-blocksize', type=int, help='block size')
    parser.add_argument('-wl', '--wave-latency', type=float, help='latency in seconds')
    parser.add_argument('-wv', '--wave-volume', type=float, default=1.0, help='wave volume ratio')
    parser.add_argument('-ws', '--wave-silentmode', action='store_true', help='silent mode')
    parser.add_argument('-wo', '--wave-outputfile', help='output WAV file')
    args = parser.parse_args(remaining)

    oSR = SoundRecorder(args)
    oSR.start()

    input()
    oSR.recordstart()

    input()
    oSR.end()


4.実験結果

キャプチャしたサンプルです(動画編と同じものです


この記事が気に入ったらサポートをしてみませんか?