見出し画像

youtubeAPI+pythonを駆使して「くしゃみシーン検出システム」が完成した【運営レポート2週目】

youtubeの切り抜きチャンネルにチャレンジしておりまして、毎週運営レポートを公開しております。まとめマガジンはこちら↓↓

運営と言いましたが実際の所は、開設から2週間経過した今週も切り抜きコンテンツを自動生成するための仕組み作りをしておりました。ようやく「くしゃみ」自動検出システムが完成したのでそろそろ動画を投稿していきたい。

記事が面白かったらフォロー&♥よろしくお願いしますm(_ _)m



日次レポート

先週に引き続きyoutubeのアーカイブ配信から「くしゃみ」シーンを特定するスクリプトをチマチマと作っておりました。

作業時間や精度の兼ね合いでアルゴリズムがフラフラと定まらず遠回りしましたが、今週でようやく目途が立ちましてそろそろ動画投稿を始められるかなぁという段階です。

先週のレポートはこちら。


2月8日

うーーん、自作モデルで3時間程度のアーカイブからくしゃみ検出処理をやってみたんだけど、1200個くらいシーン検出されてほぼ間違い。なんでや??

くしゃみ検出モデルの計算方法を変えてみる、今まではMFCCで学習させてたけどスペクトログラム+CNNのコンボが良いみたいな記事を見つけた。

chatGPTと相談しながらスクリプトを書く。

'''
tensorflow.kerasはPylanceで補完されないっぽい→インポートエラーでるけど動作は問題ない
'''
import os
import librosa
import numpy as np
from skimage.transform import resize
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam

def load_and_preprocess_data(directory, target_size=(128, 128)):
    images = []
    labels = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.wav') or file.endswith('.mp3'):
                file_path = os.path.join(root, file)
                y, sr = librosa.load(file_path, sr=None)
                S = librosa.feature.melspectrogram(y=y, sr=sr)
                S_DB = librosa.power_to_db(S, ref=np.max)
                # スペクトログラムを目標サイズにリサイズ
                S_DB_resized = resize(S_DB, target_size)
                images.append(S_DB_resized)
                label = 1 if 'sneeze' in file.lower() else 0
                labels.append(label)
    return np.array(images), np.array(labels)

def create_cnn(input_shape):
    model = Sequential([
        Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape),
        MaxPooling2D(pool_size=(2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D(pool_size=(2, 2)),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])
    return model

if __name__ == "__main__":
    directory = 'sneeze_clip_audio'
    target_size = (128, 128, 1)  # スペクトログラムのサイズとチャネル数
    images, labels = load_and_preprocess_data(directory, target_size=target_size[:-1])
    images = images.reshape((-1, *target_size))  # CNNに入力するための形状に変換

    # データをトレーニングセットとテストセットに分割
    X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

    # トレーニングセットをさらにトレーニングセットとバリデーションセットに分割
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)  # 0.25 x 0.8 = 0.2

    # モデルの作成とトレーニング
    model = create_cnn(target_size)
    model.fit(X_train, y_train, epochs=10, validation_data=(X_val, y_val))

    # モデルの評価
    score = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test loss: {score[0]}\nTest accuracy: {score[1]}")

    # モデルの保存先ディレクトリ
    model_dir = 'models'
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)

    # モデルのファイル名にテスト精度とデータセット件数を追加
    accuracy = score[1]  # テスト精度
    dataset_size = len(labels)  # 使用したデータセットの総件数
    model_filename = f"sneeze_detection_model_acc{accuracy:.4f}_size{dataset_size}.h5"

    # モデルの保存
    model_path = os.path.join(model_dir, model_filename)
    model.save(model_path)
    print(f"Model saved to {model_path}")

結果はこんな感じ。

14/14 [==============================] - 3s 131ms/step - loss: 20.5587 - accuracy: 0.5882 - val_loss: 0.4502 - val_accuracy: 0.7394
Epoch 2/10
14/14 [==============================] - 2s 118ms/step - loss: 0.3733 - accuracy: 0.8235 - val_loss: 0.3596 - val_accuracy: 0.8239
Epoch 3/10
14/14 [==============================] - 2s 115ms/step - loss: 0.2916 - accuracy: 0.8847 - val_loss: 0.3454 - val_accuracy: 0.8380
acy: 0.9176 - val_loss: 0.3232 - val_accuracy: 0.8592
Epoch 5/10
14/14 [==============================] - 2s 113ms/step - loss: 0.1880 - accuracy: 0.9365 - val_loss: 0.2977 - val_accuracy: 0.8732
Epoch 6/10
14/14 [==============================] - 2s 113ms/step - loss: 0.1205 - accuracy: 0.9694 - val_loss: 0.3373 - val_accuracy: 0.8944
Epoch 7/10
14/14 [==============================] - 2s 117ms/step - loss: 0.2803 - accuracy: 0.9435 - val_loss: 0.3773 - val_accuracy: 0.8803
Epoch 8/10
14/14 [==============================] - 2s 113ms/step - loss: 0.0924 - accuracy: 0.9671 - val_loss: 0.4291 - val_accuracy: 0.8803
Epoch 9/10
14/14 [==============================] - 2s 113ms/step - loss: 0.0481 - accuracy: 0.9906 - val_loss: 0.4273 - val_accuracy: 0.8803
Epoch 10/10
14/14 [==============================] - 2s 111ms/step - loss: 0.0357 - accuracy: 0.9929 - val_loss: 0.6206 - val_accuracy: 0.9014
Test loss: 0.755538284778595
Test accuracy: 0.8802816867828369

とりあえず動いているのでプロセスとしてはいいのかな?精度向上施策の効果が分かりにくいからグラフで見たい、散布図とかを見た事ある気がする。

精度を出すためにトレーニング用のデータがどれくらい必要かchatGPTに聞いてみた。

20,000件集めろってか、いいぜやったらぁ!!

1700件ほどくしゃみを集めた所で方向性が合っているのか疑問に思い始めた。もっと簡単な方法あるんでないの??

元はライブ配信なんだし、くしゃみしたら「くしゃみ」ってキーワードのコメント増えるんじゃね?音声データじゃなくてコメント分析から攻めた方が効率いいんじゃね??


2月9日

chatGPTと相談しながら良いアプローチを思い付いた。

  1. アーカイブのチャットログから「くしゃみ」キーワードが増加したタイミングをインデックス

  2. インデックスされた時点から直前に発生した大きな音を探す

  3. この大きな音をくしゃみ検出モデルでチェックさせる

この三段活用でいけそうな気がしてきた。1で不必要な部分をバッサリ削除して、2.3でくしゃみ音声の特徴量 or くしゃみ検出モデルでピンポイントで特定するみたいなイメージ。

まずは1を満たすスクリプトを書いて使えるか試してみようか。

from yt_dlp import YoutubeDL
import re
import json
from collections import defaultdict
from datetime import datetime
from datetime import timedelta
from moviepy.editor import VideoFileClip
import time

def load_chat_log(file_path):
    with open(file_path, "r", encoding="utf-8") as file:
        chat_log = json.load(file)
    return chat_log

def normalize_timestamp(timestamp_text):
    if len(timestamp_text.split(':')) == 2:
        timestamp_text = f"00:{timestamp_text}"
    return timestamp_text

def find_keywords_comments(chat_log, keyword):
    keyword_comments = defaultdict(list)
    processed_ids = set()  # 処理済みのコメントIDを追跡

    for action in chat_log:
        details = action.get("replayChatItemAction", {}).get("actions", [])[0].get("addChatItemAction", {}).get("item", {})
        message_details = details.get("liveChatTextMessageRenderer", {}).get("message", {}).get("runs", [])
        paid_message_details = details.get("liveChatPaidMessageRenderer", {}).get("message", {}).get("runs", [])
        
        timestamp_text = details.get("liveChatTextMessageRenderer", {}).get("timestampText", {}).get("simpleText", "")
        if not timestamp_text:  # スパチャの場合、timestampTextの位置が異なる
            timestamp_text = details.get("liveChatPaidMessageRenderer", {}).get("timestampText", {}).get("simpleText", "")
            
        timestamp_text = normalize_timestamp(timestamp_text)
        author_name = details.get("liveChatTextMessageRenderer", {}).get("authorName", {}).get("simpleText", "")
        if not author_name:  # スパチャの場合、authorNameの位置が異なる
            author_name = details.get("liveChatPaidMessageRenderer", {}).get("authorName", {}).get("simpleText", "")

        message_id = details.get("liveChatTextMessageRenderer", {}).get("id", "")
        if not message_id:  # スパチャの場合、idの位置が異なる
            message_id = details.get("liveChatPaidMessageRenderer", {}).get("id", "")

        if message_id in processed_ids:  # 重複を避ける
            continue

        for message_detail in message_details + paid_message_details:
            if keyword in message_detail.get("text", "") and timestamp_text:
                time_obj = datetime.strptime(timestamp_text, '%H:%M:%S')
                minute_key = time_obj.strftime('%H:%M')
                comment_text = message_detail.get("text", "")
                keyword_comments[minute_key].append((author_name, comment_text))
                processed_ids.add(message_id)  # 処理済みとしてIDを記録

    return keyword_comments

def display_comments(comments):
    for minute, comment_details in sorted(comments.items(), key=lambda x: datetime.strptime(x[0], "%H:%M")):
        print(f"{minute} - {len(comment_details)}件のコメント:")
        #for author, msg in comment_details:
            #print(f"    {author}: {msg}")

def fix_json_format(input_file_path, output_file_path):
    with open(input_file_path, 'r', encoding='utf-8') as input_file:
        lines = input_file.readlines()
    
    json_objects = [line.strip() for line in lines if line.strip()]
    fixed_json = "[" + ",".join(json_objects) + "]"
    
    with open(output_file_path, 'w', encoding='utf-8') as output_file:
        output_file.write(fixed_json)
        
def identify_scenes_from_comments(comments):
    minutes = sorted(datetime.strptime(minute, "%H:%M") for minute in comments.keys())
    scenes = []
    current_scene = [minutes[0]]

    for i in range(1, len(minutes)):
        if (minutes[i] - current_scene[-1]) <= timedelta(minutes=1):
            current_scene.append(minutes[i])
        else:
            scenes.append(current_scene)
            current_scene = [minutes[i]]
    scenes.append(current_scene)  # 最後のシーンを追加

    # シーンの範囲を開始1分前から終了1分後まで計算
    scene_ranges = [(scene[0] - timedelta(minutes=1), scene[0] + timedelta(minutes=1)) for scene in scenes]
    return scene_ranges

def display_scene_ranges(scene_ranges):
    for start, end in scene_ranges:
        print(f"{start.strftime('%H:%M:%S')}-{end.strftime('%H:%M:%S')}")

def download_video(video_url):
    ydl_opts = {
        'outtmpl' : '%(id)s'+'.mp4',
        'format' : 'best',
        'writesubtitles' : True,
    }
    with YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])
    return f'{video_id}.mp4'  # ダウンロードされた動画ファイルの名前を返す

def cut_scenes_from_video(video_file, scene_ranges):
    for i, (start, end) in enumerate(scene_ranges):
        start_seconds = start.hour * 3600 + start.minute * 60 + start.second
        end_seconds = end.hour * 3600 + end.minute * 60 + end.second
        clip = VideoFileClip(video_file).subclip(start_seconds, end_seconds)
        clip.write_videofile(f"{video_id}_scene_{i+1}.mp4")
        clip.close()

if __name__ == "__main__":
    
    # 動画URL
    video_url = 'https://www.youtube.com/watch?v=WbQbJK7u1ys'

    # 動画IDをURLから抽出
    video_id = re.search(r'watch\?v=([a-zA-Z0-9_-]+)', video_url).group(1)
    print(video_id)
    
    start_time = time.time()  # 処理開始時刻

    video_file = download_video(video_url)  # 動画をダウンロード
    
    input_file_path = f'{video_id}.live_chat.json'  # 動画IDを使用して入力ファイルパスを設定
    output_file_path = f'{video_id}.live_chat.json'  # 出力ファイル名にも動画IDを使用
    fix_json_format(input_file_path, output_file_path)
    
    chat_log = load_chat_log(output_file_path)
    keyword = "くしゃみ"
    keyword_comments = find_keywords_comments(chat_log, keyword)
    display_comments(keyword_comments)  # コメントの集計を表示

    # シーンを特定して表示
    scene_ranges = identify_scenes_from_comments(keyword_comments)
    print("くしゃみシーンの範囲:")
    display_scene_ranges(scene_ranges)

    # 特定されたシーンを動画から切り抜く
    cut_scenes_from_video(video_file, scene_ranges)
    
    end_time = time.time()  # 処理終了時刻
    print(f"処理時間: {end_time - start_time:.2f}秒")

サンプル動画は下記。

処理結果はこんな感じに。

サンプルとして分かりやすい結果になった、シーン①00:58:00-01:00:00は実際にくしゃみしているけどシーン②01:33:00-01:35:00はくしゃみをしていない。

次の工程でこの2つのシーンを判定させてシーン①だけ残せば大成功。

以前はアーカイブ動画全体を1秒ごとに分割して強引に検出モデルにくしゃみ判定させていたけど、これなら合計で4分の動画だけをチェックすればいい。すばらすぃ。


2月10日

今やってるくしゃみ検出システムが完成したら外部のサーバーに設置して自動化したいので、以前契約したさくらVPSで動くか確認してみる。

おぉ~エラー出ず完走しおった、メモリ足りないかと思ったけど2Gでいけたな。64G積んでるローカル環境よりは時間かかってるけど、全然問題ない範囲。

1時間40分の動画が7分ちょいでくしゃみサーチできた、あとはyoutubeAPIで指定チャンネルからアーカイブ動画の一覧取得して無限にくしゃみサーチさせとけば自動検出システム完成!よし、目途が立った。

という事でくしゃみシーンにくしゃみが含まれているかチェックする仕組み作りに戻る。

いやまて、数時間のアーカイブからくしゃみ確率が高い2分の動画まで絞り込めてるなら、後はもう手作業でガシガシやった方がいいのか?

2分程度の動画からくしゃみシーンだけを残して編集するくらいなら、Vrewで簡単に行けそうだけどどうしよう。とりあえず試してみるか。

Vrewの操作に慣れてないけど10分くらいで形になった、1日で1ヶ月分作っちゃいたいので30本仕上げるとして300分=5時間、毎月1日だけ5時間作業すれば1ヶ月分のコンテンツを準備出来ると思えばそれなりか??慣れればもっと早くなると思うし。

これでイイ気がしてきたな、うん。

くしゃみシーン特定システムをさくらVPSに設置してcornで定期実行させちゃおう、やってみておかしかったらまた直せばいいや。早く動画投稿したいし。


2月11日

youtubeAPIからホロライブの動画自動で収集する仕組みを作る、取得した動画がくしゃみサーチ済みか管理できればいいのでSQLiteでいいや。

とりあえずDBにレコードを入出力するスクリプトを作ろうか。

◆ 0_makeDB.py

※長いので「シーン特定・抽出スクリプト」の章にて公開

よしよし、DBにチャンネルIDテーブル作って登録されたチャンネルIDを参照し、youtubeAPIから直近50件の動画リストを取得するまではできた。

次はこのレコードに対して1件ずつくしゃみサーチを行うようにする。

たまたまサンプルとなった動画が9時間を超える動画で、コメント件数が170,000件くらいあった。jsonファイルから「くしゃみ」キーワードをカウントする工程でプロセスがkilledされてしまう。

リソースが足りないんだろうけど、これ以上ハイスペックなプランに課金する気はないのでリソースを節約するようにスクリプトを見直そう。

jsonファイルを直接開いて全件処理させようとしているのがマズそう、一旦DBに保存させてから処理したら節約になるんかな?試してみよう。


2月12日

さくらVPSのスペックが足りず処理が完走出来なかったので、ローカルPCで動くか試してみる。

◆ 1_sneeze_clipping.py

※長いので「シーン特定・抽出スクリプト」の章にて公開

サンプルは下記。

duration1時間20分:チャット35,000件くらい。処理完走、くしゃみなし。

duration9時間20分:チャット170,000件くらい。処理完走、くしゃみ4ヶ所検出。

いいんじゃない??9時間越えのアーカイブをマシンパワーの力技で処理完走できた。

ローカルマシンだと定期実行できないデメリットはあるけど、1ヶ月分のコンテンツだけ作れればいいし、パソコンつけっぱで必要件数が溜まるまでブンブン放置しとけばいいか。

よっしゃ、ようやく動画投稿の準備が整ったわ。


2月13日

昨日作ったpythonスクリプトに大空スバルのチャンネルを設定し一晩放置してみた所、視聴者がくしゃみに言及したシーン22件を自動で切り抜けていた。

22件の中身を全部目視でチェックすると、実際に動画ネタになりそうなシーンは6件あった。6/22件なら全然悪くない精度だし1晩で処理した動画の延べ時間は160時間に達した。

160時間の動画から6件のくしゃみシーンを一晩で探すのは人力では不可能なので、他のチャンネルと差別化できる要素が出来た気がする、OK。

次は作業時間をさらに圧縮するため22件→6件に絞るための工程を考える。精度が高いとはいえ22件の動画はそれぞれ2分あるので、全部目視チェックしようとすると44分かかるし倍速でも30分近くかかる。面倒臭がりの自分にはかなり重たい。

以前作ったくしゃみ検出モデルとかくしゃみエネルギーの検出アルゴリズムは精度がイマイチなので、切り抜いた動画のオーディオ波形をプログレスバーの下にプロットして目視チェックするのが精度・時間の両面からマシな気がする。

と言う事で、チェック用のエディタ?を作ってみよう。

いや違うなVrewでいいや、切り抜いた22件の動画はVrewに読ませれば文字起こしもされるから、くしゃみがあるかないかすぐ分かる。くしゃみがなければ消せばいいし、あればそのままショート動画用に編集しよう。

課金したおかげで1200分という使い切れない音声認識リソース貰ってるし、これで使い道ができたと考えよう。

Vrewでシコシコ作った結果がこちら。

これくらいが5分程度で作れるようになった、十分じゃない??

よーし、量産しよう!楽しくなってきた!


時短&効率化ツール

切り抜きチャンネルに限らず、YouTubeチャンネルを運営する上で役に立つツールを自作し公開しています。興味のある方はぜひご一読下さいm(_ _)m


シーン特定・抽出スクリプト

今回作成した特定シーン切り抜きスクリプトは、PCのスペックが足りないとプロセスがkilledされる可能性がありますんでご注意下さい。

ここから先は

11,085字
この記事のみ ¥ 1,990

よろしければサポートお願いします、頂いたサポートは活動費として使用させて頂きより有意義な記事を書けるように頑張ります!