【Claude3】VTube Studio との連携を実現する Python コードの解説【丸投げ】

イントロダクション

Claude3 Opusに丸投げして書かせたプログラムがなかなかのものだったので共有しておきます。以下のライブラリ2つのソースコードを全てコンテキストに投げたうえで書かせたプログラムです。

TL;DR

本記事では、VTube Studio と連携してバーチャルキャラクターの動作を制御するための一連の Python プログラムについて解説します。これらのプログラムは、vts_client.py、vts_control.py、vts_actions.py の3つのファイルで構成されており、それぞれが重要な役割を果たしています。

  1. vts_client.py:

    • VTube Studio とのWebSocket通信を管理するための VTSClient クラスを定義しています。

    • このクラスは、VTube Studio へのリクエストの送信、レスポンスの受信、認証トークンの管理などの機能を提供します。

    • vts_control.py と vts_actions.py は、このクラスを使用してVTube Studioとの通信を行います。

  2. vts_control.py:

    • FastAPI を使用して REST API を提供し、外部からのリクエストを受け付けます。

    • VTSControl クラスを定義し、VTube Studio への接続、切断、認証などの処理を行います。

    • /on_comment エンドポイントを提供し、外部からのコメントデータを受信します。

    • コメントデータに基づいて、vts_actions.py で定義された関数を呼び出し、キャラクターの動作を制御します。

  3. vts_actions.py:

    • バーチャルキャラクターの動作を制御するための様々な関数を定義しています。

    • 視線の移動、体の揺れ、笑顔の表現、目の開閉などの動作を、VTube Studio のパラメータを操作することで実現します。

    • これらの関数は、vts_control.py から呼び出され、キャラクターのインタラクティブな動作を生成します。

これらの3つのプログラムは、連携して動作することでVTube Studio とのインタラクションを実現しています。vts_client.py がVTube Studio との低レベルな通信を担当し、vts_control.py が外部からのリクエストを受け付けてキャラクターの動作を制御するための中心的な役割を果たします。vts_actions.py は、具体的なキャラクターの動作を定義し、vts_control.py から呼び出されます。

以下では、それぞれのファイルの内容を詳しく見ていきますが、これらのプログラムが協調して動作することで、VTube Studio を使ったインタラクティブなバーチャルキャラクターのシステムを構築できることを理解しておくことが重要です。


vts_client.py

import asyncio
import pyvts
import json
import os
import aiofiles


class VTSClient(pyvts.vts):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.request_queue = asyncio.Queue()
        self.response_queue = asyncio.Queue()

    async def request(self, request_msg: dict) -> dict:
        if asyncio.iscoroutine(request_msg):
            request_msg = await request_msg
        await self.request_queue.put(request_msg)
        return await self.response_queue.get()

    def BaseRequest(self, request_type, data=None):
        return super().BaseRequest(request_type, data)

    async def process_requests(self):
        """リクエストを処理するループ"""
        while True:
            request_msg = await self.request_queue.get()
            await self.websocket.send(json.dumps(request_msg))
            response_msg = await self.websocket.recv()
            response_dict = json.loads(response_msg)
            await self.response_queue.put(response_dict)
            self.request_queue.task_done()

    async def read_token(self) -> str:
        """トークンをファイルから読み込む"""
        if not os.path.exists(self.token_path):
            return ""

        async with aiofiles.open(self.token_path, mode="r") as f_token:
            self.authentic_token = await f_token.read()
        return self.authentic_token

    async def write_token(self) -> None:
        """トークンをファイルに書き込む"""
        try:
            async with aiofiles.open(self.token_path, mode="w") as f_token:
                await f_token.write(self.authentic_token)
        except FileNotFoundError:
            print("write authentic token files failed")

    async def request_authenticate_token(self, force=False) -> None:
        """認証トークンをリクエストする"""
        response = await self.read_token()

        if response == "" or force:
            request_msg = await self.request(self.vts_request.authentication_token())
            if "authenticationToken" in request_msg["data"]:
                self.authentic_token = request_msg["data"]["authenticationToken"]
                await self.write_token()
            else:
                print("authentication failed")

    async def request_authenticate(self) -> bool:
        """認証リクエストを送信"""
        require_msg = await self.request(self.vts_request.authentication(self.authentic_token))
        if require_msg["data"]["authenticated"]:
            return True
        else:
            print("Authentication Failed")
            return False

    async def requestSetMultiParameterValue(
        self,
        parameters: list[str],
        values: list[float],
        weight: float = 1,
        face_found=False,
        mode="set",
    ) -> dict:
        """複数のパラメータ値を設定するリクエストを送信"""
        data = {
            "faceFound": face_found,
            "mode": mode,
            "parameterValues": [
                {"id": parameters[i], "weight": weight, "value": values[i]}
                for i, _ in enumerate(parameters)
            ],
        }
        request_msg = self.BaseRequest("InjectParameterDataRequest", data=data)
        return await self.request(request_msg)

説明

VTube Studio との通信を管理するための vts_client.py の内容を見ていきましょう。

import asyncio
import pyvts
import json
import os
import aiofiles

class VTSClient(pyvts.vts):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.request_queue = asyncio.Queue()
        self.response_queue = asyncio.Queue()

VTSClient クラスは pyvts.vts を継承しており、VTube Studio とのWebSocket通信を管理します。__init__ メソッドでは、リクエストとレスポンスを管理するための非同期キューを初期化しています。

async def request(self, request_msg: dict) -> dict:
    if asyncio.iscoroutine(request_msg):
        request_msg = await request_msg
    await self.request_queue.put(request_msg)
    return await self.response_queue.get()

request メソッドは、リクエストメッセージを送信し、レスポンスを待機します。リクエストメッセージが非同期関数の場合は、その結果を待ってからキューに追加します。

async def process_requests(self):
    while True:
        request_msg = await self.request_queue.get()
        await self.websocket.send(json.dumps(request_msg))
        response_msg = await self.websocket.recv()
        response_dict = json.loads(response_msg)
        await self.response_queue.put(response_dict)
        self.request_queue.task_done()

process_requests メソッドは、リクエストを処理するループを実行します。リクエストキューからメッセージを取り出し、WebSocketを介して送信します。その後、レスポンスを待ち、受信したレスポンスをレスポンスキューに追加します。

async def read_token(self) -> str:
    if not os.path.exists(self.token_path):
        return ""

    async with aiofiles.open(self.token_path, mode="r") as f_token:
        self.authentic_token = await f_token.read()
    return self.authentic_token

async def write_token(self) -> None:
    try:
        async with aiofiles.open(self.token_path, mode="w") as f_token:
            await f_token.write(self.authentic_token)
    except FileNotFoundError:
        print("write authentic token files failed")

read_token メソッドと write_token メソッドは、認証トークンをファイルから読み込んだり、ファイルに書き込んだりするために使用されます。aiofiles を使用して非同期でファイルを操作しています。

async def request_authenticate_token(self, force=False) -> None:
    response = await self.read_token()

    if response == "" or force:
        request_msg = await self.request(self.vts_request.authentication_token())
        if "authenticationToken" in request_msg["data"]:
            self.authentic_token = request_msg["data"]["authenticationToken"]
            await self.write_token()
        else:
            print("authentication failed")

request_authenticate_token メソッドは、認証トークンをリクエストします。トークンが存在しない場合や強制的にリクエストする場合は、新しいトークンを取得し、ファイルに保存します。

async def request_authenticate(self) -> bool:
    require_msg = await self.request(self.vts_request.authentication(self.authentic_token))
    if require_msg["data"]["authenticated"]:
        return True
    else:
        print("Authentication Failed")
        return False

request_authenticate メソッドは、認証リクエストを送信します。取得した認証トークンを使用してリクエストを送信し、認証の成否を返します。

async def requestSetMultiParameterValue(
    self,
    parameters: list[str],
    values: list[float],
    weight: float = 1,
    face_found=False,
    mode="set",
) -> dict:
    data = {
        "faceFound": face_found,
        "mode": mode,
        "parameterValues": [
            {"id": parameters[i], "weight": weight, "value": values[i]}
            for i, _ in enumerate(parameters)
        ],
    }
    request_msg = self.BaseRequest("InjectParameterDataRequest", data=data)
    return await self.request(request_msg)

requestSetMultiParameterValue メソッドは、複数のパラメータ値を設定するリクエストを送信します。パラメータIDとその値のリストを受け取り、リクエストデータを構築してVTube Studioに送信します。

以上が vts_client.py の主要な内容です。このクラスを使用することで、VTube Studio とのWebSocket通信を管理し、認証トークンの取得や各種リクエストの送信を行うことができます

vts_control.py

import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
from vts_client import VTSClient
from vts_actions import on_no_comment_received, blink_eyes, idle_motion, on_comment_received, look_at_angle, look_down_and_up

app = FastAPI()

class VTSControl:
    def __init__(self):
        self.vts = None
        self.comment_event = asyncio.Event()

    async def connect(self):
        vts_api_info = {
            "name": "AItuber_plugin",
            "version": "0.0.1",
            "host": "0.0.0.0",
            "port": 4567
        }

        self.vts = VTSClient(vts_api_info=vts_api_info)

        try:
            await self.vts.connect()
            print("VTube Studioに接続しました")
            asyncio.create_task(self.vts.process_requests())

            await self.vts.request_authenticate_token()
            authenticated = await self.vts.request_authenticate()

            if authenticated:
                print("認証に成功しました")
            else:
                print("認証に失敗しました")
                return

            close_duration = 20
            open_duration = 0.1
            asyncio.create_task(on_no_comment_received(self.vts, self.comment_event, close_duration, open_duration))
            # asyncio.create_task(blink_eyes(self.vts, close_duration, open_duration))
            asyncio.create_task(self.continuous_idle_motion(86400, self.comment_event))

        except Exception as e:
            print(f"接続エラー: {str(e)}")

    async def disconnect(self):
        if self.vts:
            await self.vts.close()
            print("VTube Studioから切断しました")

    async def on_comment(self):
        await on_comment_received(self.vts, self.comment_event)

    def set_comment_event(self):
        self.comment_event.set()

    async def continuous_idle_motion(self, duration, comment_event=None):
        while True:
            await idle_motion(self.vts, duration, comment_event)
            await asyncio.sleep(duration)  # idle_motionの実行間隔を制御

vts_control = VTSControl()

@app.on_event("startup")
async def startup_event():
    await vts_control.connect()

@app.on_event("shutdown")
async def shutdown_event():
    await vts_control.disconnect()

class CommentData(BaseModel):
    user: str
    comment: str

@app.post("/on_comment")
async def on_comment(comment_data: CommentData):
    print(f"Received comment: {comment_data}")  # デバッグ用のログを追加
    vts_control.set_comment_event()
    # asyncio.create_task(vts_control.on_comment())  # コメント読みのモーションを別タスクで実行
    return {"status": "success"}

async def main():
    import uvicorn

    config = uvicorn.Config("vts_control:app", host="0.0.0.0", port=8000, loop="asyncio")
    server = uvicorn.Server(config)

    try:
        await server.serve()
    except KeyboardInterrupt:
        print("Shutting down...")
        await vts_control.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

説明

FastAPI を使用して REST API を提供し、VTube Studio とのインタラクションを制御する vts_control.py の内容を見ていきましょう。

import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
from vts_client import VTSClient
from vts_actions import on_no_comment_received, blink_eyes, idle_motion, on_comment_received, look_at_angle, look_down_and_up

app = FastAPI()

class VTSControl:
    def __init__(self):
        self.vts = None
        self.comment_event = asyncio.Event()

VTSControl クラスは、VTube Studio との連携を管理するメインのクラスです。__init__ メソッドでは、VTSClient のインスタンスと、コメントイベントを管理するための asyncio.Event を初期化しています。

async def connect(self):
    vts_api_info = {
        "name": "AItuber_plugin",
        "version": "0.0.1",
        "host": "0.0.0.0",
        "port": 4567
    }

    self.vts = VTSClient(vts_api_info=vts_api_info)

    try:
        await self.vts.connect()
        print("VTube Studioに接続しました")
        asyncio.create_task(self.vts.process_requests())

        await self.vts.request_authenticate_token()
        authenticated = await self.vts.request_authenticate()

        if authenticated:
            print("認証に成功しました")
        else:
            print("認証に失敗しました")
            return

        close_duration = 20
        open_duration = 0.1
        asyncio.create_task(on_no_comment_received(self.vts, self.comment_event, close_duration, open_duration))
        asyncio.create_task(self.continuous_idle_motion(86400, self.comment_event))

    except Exception as e:
        print(f"接続エラー: {str(e)}")

connect メソッドは、VTube Studio に接続し、認証を行います。VTSClient のインスタンスを作成し、接続とリクエストの処理を開始します。その後、認証トークンをリクエストし、認証を行います。認証に成功した場合、on_no_comment_received と continuous_idle_motion のタスクを作成し、非同期で実行します。

async def disconnect(self):
    if self.vts:
        await self.vts.close()
        print("VTube Studioから切断しました")

disconnect メソッドは、VTube Studio から切断します。

async def on_comment(self):
    await on_comment_received(self.vts, self.comment_event)

def set_comment_event(self):
    self.comment_event.set()

on_comment メソッドは、コメントが来た場合の処理を行います。on_comment_received 関数を呼び出し、コメントイベントを設定します。set_comment_event メソッドは、コメントイベントを設定するためのヘルパーメソッドです。

async def continuous_idle_motion(self, duration, comment_event=None):
    while True:
        await idle_motion(self.vts, duration, comment_event)
        await asyncio.sleep(duration)

continuous_idle_motion メソッドは、継続的なアイドルモーションを実行します。idle_motion 関数を呼び出し、指定された時間間隔でアイドルモーションを実行します。

vts_control = VTSControl()

@app.on_event("startup")
async def startup_event():
    await vts_control.connect()

@app.on_event("shutdown")
async def shutdown_event():
    await vts_control.disconnect()

VTSControl のインスタンスを作成し、FastAPI の startup と shutdown イベントで、それぞれ connect メソッドと disconnect メソッドを呼び出しています。

class CommentData(BaseModel):
    user: str
    comment: str

@app.post("/on_comment")
async def on_comment(comment_data: CommentData):
    print(f"Received comment: {comment_data}")
    vts_control.set_comment_event()
    return {"status": "success"}

/on_comment エンドポイントは、POST リクエストを受け取り、コメントデータを処理します。CommentData モデルを使用してリクエストボディを解析し、set_comment_event メソッドを呼び出してコメントイベントを設定します。

async def main():
    import uvicorn

    config = uvicorn.Config("vts_control:app", host="0.0.0.0", port=8000, loop="asyncio")
    server = uvicorn.Server(config)

    try:
        await server.serve()
    except KeyboardInterrupt:
        print("Shutting down...")
        await vts_control.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

main 関数は、Uvicorn を使用して FastAPI アプリケーションを起動します。KeyboardInterrupt が発生した場合は、適切にシャットダウンし、VTube Studio から切断します。

以上が vts_control.py の主要な内容です。このファイルは、FastAPI を使用して REST API を提供し、vts_client.py と vts_actions.py を組み合わせて、VTube Studio とのインタラクションを制御します。

vts_actions.py

import asyncio
import math
import random
import time
import numpy as np

async def look_at_angle(vts, angle_deg, radius, duration=0.5):
    param_x = "FaceAngleX"
    param_y = "FaceAngleY"

    angle_rad = np.radians(angle_deg)

    # X軸方向の回転(左右)
    value_x = 30 * radius * np.sin(angle_rad)

    # Y軸方向の回転(上下)
    value_y = -30 * radius * np.cos(angle_rad)  # 上下の方向を反転

    # パラメータ設定のリクエストを並列に送信
    await asyncio.gather(
        vts.request(vts.vts_request.requestSetParameterValue(param_x, value_x)),
        vts.request(vts.vts_request.requestSetParameterValue(param_y, value_y))
    )
    # await asyncio.sleep(duration)
    
async def look_down_and_up(vts, comment_event, down_duration, up_duration):
    """うつむいて、正面を向く動作を行う"""
    down_angle = 0  # うつむく角度(度数法)
    down_radius = 1.0  # うつむく時の目線の移動半径
    up_angle = 0  # 正面を向く角度(度数法)
    up_radius = 0  # 正面を向く時の目線の移動半径

    steps = 100  # 分割数

    # 下を向く
    for i in range(steps):
        if comment_event.is_set():
            break  # コメントが来たら処理を中断
        current_radius = down_radius * (i / steps)
        await look_at_angle(vts, down_angle, current_radius, down_duration / steps)

    # しばらくうつむいたままでいる
    for _ in range(int(down_duration)):
        if comment_event.is_set():
            break  # コメントが来たら処理を中断
        await asyncio.sleep(1)

    # 正面を向く(飛び起きる)
    for i in range(steps):
        if comment_event.is_set():
            break  # コメントが来たら処理を中断
        current_radius = down_radius - (down_radius * (i / steps))
        await look_at_angle(vts, up_angle, current_radius, up_duration / steps)

async def sway_body(vts, sway_amount, sway_duration):
    """体を揺らす動作"""
    param_sway = "FaceAngleZ"
    steps = 100  # 揺れの分割数
    half_duration = sway_duration / 2

    for i in range(steps):
        t = i / steps
        sway_angle = sway_amount * math.sin(2 * math.pi * t)
        await vts.request(vts.vts_request.requestSetParameterValue(param_sway, sway_angle))
        await asyncio.sleep(half_duration / steps)

    await vts.request(vts.vts_request.requestSetParameterValue(param_sway, 0))

async def idle_motion(vts, idle_duration, comment_reading_event):
    """待機中のモーション"""
    start_time = asyncio.get_event_loop().time()

    while asyncio.get_event_loop().time() - start_time < idle_duration:
        # コメント読みのモーション中は視線を動かさない
        if not comment_reading_event.is_set():
            # ランダムな角度と半径を生成
            angle_deg = random.uniform(0, 360)
            radius = random.uniform(0.2, 0.4)
            look_duration = idle_duration / 2

            # 視線を動かすモーションを自然な動きにする
            steps = 40  # 分割数
            step_duration = look_duration / (2 * steps)  # 各ステップの時間

            # 指定された角度と半径に向かって視線を移動
            for i in range(steps):
                if comment_reading_event.is_set():
                    break  # コメント読みのモーションが開始された場合、視線のモーションを中断
                current_radius = radius * (i / steps)
                await look_at_angle(vts, angle_deg, current_radius, step_duration)

            # 視線をデフォルト状態に戻す
            if not comment_reading_event.is_set():
                for i in range(steps, 0, -1):
                    if comment_reading_event.is_set():
                        break  # コメント読みのモーションが開始された場合、視線のモーションを中断
                    current_radius = radius * (i / steps)
                    await look_at_angle(vts, angle_deg, current_radius, step_duration)
                
                # 最後にデフォルト状態に設定
                await look_at_angle(vts, 0, 0, step_duration)

        # 体を揺らすパラメータを生成
        sway_amount = random.uniform(5, 30)
        sway_duration = random.uniform(1, 3)

        # 体を揺らすモーションを実行
        await sway_body(vts, sway_amount, sway_duration)

async def on_no_comment_received(vts, comment_event, down_duration=20.0, up_duration=0.5):
    """コメントが来ない場合の処理"""
    while not comment_event.is_set():
        print("コメントが来ない状態")
        # await look_down_and_up(vts, comment_event, down_duration, up_duration)
        await asyncio.sleep(1)  # 1秒ごとにコメントの有無をチェック

    # コメントが来たら飛び起きる
    await on_comment_received(vts, comment_event)

async def smile(vts, smile_amount):
   """笑顔を作る"""
   param_smile = "MouthSmile"
   await vts.request(vts.vts_request.requestSetParameterValue(param_smile, smile_amount))

async def look_at_and_smile(vts, angle_deg, radius, smile_amount, duration):
    """指定された角度と半径で目線を移動しつつ、笑顔を作る"""
    steps = 10  # 分割数
    step_duration = duration / (2 * steps)  # 各ステップの時間
    
    print("処理開始")

    # 指定された角度と半径に向かって目線を移動しつつ、笑顔を作る
    for i in range(steps):
        current_time = time.time()
        current_radius = radius * (i / steps)
        current_smile = smile_amount * (i / steps)
        await look_at_angle(vts, angle_deg, current_radius, step_duration)
        await smile(vts, current_smile)
        print(f"time step_4 delay: {time.time() - current_time:.4f}秒")

    print("look_at_and_smile")

    # デフォルト状態に戻る
    for i in range(steps, 0, -1):
        current_time = time.time()
        current_radius = radius * (i / steps)
        current_smile = smile_amount * (i / steps)
        await look_at_angle(vts, angle_deg, current_radius, step_duration)
        await smile(vts, current_smile)
        print(f"time step_4 delay: {time.time() - current_time:.4f}秒")
    print("処理終了")

    print("最終位置に移動開始")
    await look_at_angle(vts, 0, 0)
    print("最終位置に移動完了")
    print("最終笑顔に設定開始")
    await smile(vts, 0)
    print("最終笑顔に設定完了")

async def on_comment_received(vts, comment_event):
    """コメントが来た場合の処理"""
    while True:
        await comment_event.wait()
        print("コメントが来ました")
        angle_deg = 45  # 右下を向く角度(度数法)
        radius = 1.0   # 目線の移動半径
        smile_amount = 1.0  # 笑顔の量(-1から1の範囲)
        duration = 2.0  # 動作の所要時間(秒)
        start_time = time.time()
        await look_at_and_smile(vts, angle_deg, radius, smile_amount, duration)
        print(f"処理時間: {time.time() - start_time:.2f}秒")
        print("コメント処理が完了しました")
        comment_event.clear()

async def close_eyes(vts, duration):
   """目を閉じる"""
   left_eye_param = "EyeOpenLeft"
   right_eye_param = "EyeOpenRight"
   steps = 100  # 目を閉じるまでのステップ数

   for i in range(steps):
       t = i / steps
       eye_openness = 1 - t  # 目の開き具合を計算
       await vts.request(vts.vts_request.requestSetParameterValue(left_eye_param, eye_openness))
       await vts.request(vts.vts_request.requestSetParameterValue(right_eye_param, eye_openness))
       await asyncio.sleep(duration / steps)  # 目を閉じる速度の調整

async def open_eyes(vts, duration):
   """目を開ける"""
   left_eye_param = "EyeOpenLeft"
   right_eye_param = "EyeOpenRight"
   steps = 100  # 目を開けるまでのステップ数

   for i in range(steps):
       t = i / steps
       eye_openness = t  # 目の開き具合を計算
       await vts.request(vts.vts_request.requestSetParameterValue(left_eye_param, eye_openness))
       await vts.request(vts.vts_request.requestSetParameterValue(right_eye_param, eye_openness))
       await asyncio.sleep(duration / steps)  # 目を開ける速度の調整

async def blink_eyes(vts, close_duration, open_duration):
   """まばたきをする"""
   await close_eyes(vts, close_duration)
   await open_eyes(vts, open_duration)

説明

バーチャルキャラクターの動作を制御するための様々な関数が定義されている vts_actions.py の内容を詳しく見ていきましょう。

async def look_at_angle(vts, angle_deg, radius, duration=0.5):
    param_x = "FaceAngleX"
    param_y = "FaceAngleY"

    angle_rad = np.radians(angle_deg)

    value_x = 30 * radius * np.sin(angle_rad)
    value_y = -30 * radius * np.cos(angle_rad)

    await asyncio.gather(
        vts.request(vts.vts_request.requestSetParameterValue(param_x, value_x)),
        vts.request(vts.vts_request.requestSetParameterValue(param_y, value_y))
    )

look_at_angle 関数は、指定された角度と半径に基づいて、キャラクターの視線を移動させます。FaceAngleX と FaceAngleY のパラメータを使用して、X軸方向とY軸方向の回転を計算し、requestSetParameterValue メソッドを使用してパラメータ値を設定します。

async def look_down_and_up(vts, comment_event, down_duration, up_duration):
    down_angle = 0
    down_radius = 1.0
    up_angle = 0
    up_radius = 0

    steps = 100

    for i in range(steps):
        if comment_event.is_set():
            break
        current_radius = down_radius * (i / steps)
        await look_at_angle(vts, down_angle, current_radius, down_duration / steps)

    for _ in range(int(down_duration)):
        if comment_event.is_set():
            break
        await asyncio.sleep(1)

    for i in range(steps):
        if comment_event.is_set():
            break
        current_radius = down_radius - (down_radius * (i / steps))
        await look_at_angle(vts, up_angle, current_radius, up_duration / steps)

look_down_and_up 関数は、キャラクターがうつむいてから正面を向く動作を行います。comment_event が設定されるとこの動作は中断されます。look_at_angle 関数を使用して、指定された角度と半径で視線を移動させます。

async def sway_body(vts, sway_amount, sway_duration):
    param_sway = "FaceAngleZ"
    steps = 100
    half_duration = sway_duration / 2

    for i in range(steps):
        t = i / steps
        sway_angle = sway_amount * math.sin(2 * math.pi * t)
        await vts.request(vts.vts_request.requestSetParameterValue(param_sway, sway_angle))
        await asyncio.sleep(half_duration / steps)

    await vts.request(vts.vts_request.requestSetParameterValue(param_sway, 0))

sway_body 関数は、キャラクターの体を揺らす動作を行います。FaceAngleZ パラメータを使用して、体の揺れを表現します。

async def idle_motion(vts, idle_duration, comment_reading_event):
    start_time = asyncio.get_event_loop().time()

    while asyncio.get_event_loop().time() - start_time < idle_duration:
        if not comment_reading_event.is_set():
            angle_deg = random.uniform(0, 360)
            radius = random.uniform(0.2, 0.4)
            look_duration = idle_duration / 2

            steps = 40
            step_duration = look_duration / (2 * steps)

            for i in range(steps):
                if comment_reading_event.is_set():
                    break
                current_radius = radius * (i / steps)
                await look_at_angle(vts, angle_deg, current_radius, step_duration)

            if not comment_reading_event.is_set():
                for i in range(steps, 0, -1):
                    if comment_reading_event.is_set():
                        break
                    current_radius = radius * (i / steps)
                    await look_at_angle(vts, angle_deg, current_radius, step_duration)
                
                await look_at_angle(vts, 0, 0, step_duration)

        sway_amount = random.uniform(5, 30)
        sway_duration = random.uniform(1, 3)

        await sway_body(vts, sway_amount, sway_duration)

idle_motion 関数は、アイドル状態でのキャラクターの動作を制御します。ランダムな角度と半径で視線を移動させ、sway_body 関数を使用して体を揺らします。comment_reading_event が設定されている場合、視線のモーションは中断されます。

async def on_no_comment_received(vts, comment_event, down_duration=20.0, up_duration=0.5):
    while not comment_event.is_set():
        print("コメントが来ない状態")
        await asyncio.sleep(1)

    await on_comment_received(vts, comment_event)

on_no_comment_received 関数は、コメントが来ない場合の処理を行います。comment_event が設定されるまで待機し、設定されたら on_comment_received 関数を呼び出します。

async def smile(vts, smile_amount):
   param_smile = "MouthSmile"
   await vts.request(vts.vts_request.requestSetParameterValue(param_smile, smile_amount))

smile 関数は、キャラクターの笑顔を制御します。MouthSmile パラメータを使用して、笑顔の量を設定します。

async def look_at_and_smile(vts, angle_deg, radius, smile_amount, duration):
    steps = 10
    step_duration = duration / (2 * steps)

    for i in range(steps):
        current_radius = radius * (i / steps)
        current_smile = smile_amount * (i / steps)
        await look_at_angle(vts, angle_deg, current_radius, step_duration)
        await smile(vts, current_smile)

    for i in range(steps, 0, -1):
        current_radius = radius * (i / steps)
        current_smile = smile_amount * (i / steps)
        await look_at_angle(vts, angle_deg, current_radius, step_duration)
        await smile(vts, current_smile)

    await look_at_angle(vts, 0, 0)
    await smile(vts, 0)

look_at_and_smile 関数は、指定された角度と半径で視線を移動しつつ、笑顔を作ります。look_at_angle 関数と smile 関数を組み合わせて、自然な動きを表現します。

async def on_comment_received(vts, comment_event):
    while True:
        await comment_event.wait()
        print("コメントが来ました")
        angle_deg = 45
        radius = 1.0
        smile_amount = 1.0
        duration = 2.0
        await look_at_and_smile(vts, angle_deg, radius, smile_amount, duration)
        print("コメント処理が完了しました")
        comment_event.clear()

on_comment_received 関数は、コメントが来た場合の処理を行います。comment_event が設定されるまで待機し、設定されたら look_at_and_smile 関数を呼び出してコメントに反応します。

async def close_eyes(vts, duration):
   left_eye_param = "EyeOpenLeft"
   right_eye_param = "EyeOpenRight"
   steps = 100

   for i in range(steps):
       t = i / steps
       eye_openness = 1 - t
       await vts.request(vts.vts_request.requestSetParameterValue(left_eye_param, eye_openness))
       await vts.request(vts.vts_request.requestSetParameterValue(right_eye_param, eye_openness))
       await asyncio.sleep(duration / steps)

async def open_eyes(vts, duration):
   left_eye_param = "EyeOpenLeft"
   right_eye_param = "EyeOpenRight"
   steps = 100

   for i in range(steps):
       t = i / steps
       eye_openness = t
       await vts.request(vts.vts_request.requestSetParameterValue(left_eye_param, eye_openness))
       await vts.request(vts.vts_request.requestSetParameterValue(right_eye_param, eye_openness))
       await asyncio.sleep(duration / steps)

async def blink_eyes(vts, close_duration, open_duration):
   await close_eyes(vts, close_duration)
   await open_eyes(vts, open_duration)

close_eyes、open_eyes、blink_eyes 関数は、キャラクターの目の開閉を制御します。EyeOpenLeft と EyeOpenRight パラメータを使用して、左右の目の開き具合を設定します。

以上が vts_actions.py の主要な内容です。このファイルには、キャラクターの動作を制御するための様々な関数が定義されており、vts_control.py から呼び出されることで、VTube Studio と連携したインタラクティブな動作を実現しています。


この記事が気に入ったらサポートをしてみませんか?