見出し画像

【コード解説】美少女OPInterpreter簡易版

ソースコードはこちらから。

使い方

できること

  1. WebSocketでメッセージを受け取って、Open Interpreterからレスポンスを取得することができます(streamモード対応)。

  2. ファイルを送信してサーバー側に保存することができます。このファイルに対してOpen Interpreterに指示を出すことも可能です。

事前準備

  1. 本リポジトリはWebSocketでの起動を前提としているため、ご自身の環境に合わせて接続先を準備してください。

  2. 接続URLはデフォルトで`ws://127.0.0.1:8000/ws`です。

実行方法

  1. `.env`にOPENAI_API_KEYを設定

  2. `docker-compose up -d --build` 実行

デバッグ

  1. 実行環境がVSCodeのときに、DEBUG_MODE=1でデバッグモードを起動します。

  2. ブレークポイントを設置して快適にデバッグしましょう。

VSCodeのデバッグは下記のからあげさんの記事が参考になると思います。

その他

  1. ライセンスはKillianLucas/open-interpreterに準拠します。

  2. Open Interpreterの使用方法は下記にまとめています。

コード解説

pythonのお作法を正しく理解していないので、稚拙なコードはご容赦ください。

app/app.py

from fastapi import FastAPI
from app.routers import base
import ptvsd
import os

if os.getenv('DEBUG_MODE') == "1":
    # デバッグ用コード
    ptvsd.enable_attach(address=('0.0.0.0', 5678), redirect_output=True)
    ptvsd.wait_for_attach()

app = FastAPI()
app.include_router(base.router)

FastAPIの起動点です。特別なことはしていません。

app/routers/base.py

from fastapi import APIRouter, WebSocket
from ..services.open_interpreter_service import stream_open_interpreter
import asyncio

router = APIRouter()

async def main(websocket):
    task1 = asyncio.create_task(stream_open_interpreter(websocket))
    await asyncio.gather(task1)

@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        await main(websocket)
    except Exception as e:
        print(f"WebSocket Endpoint Error: {e}")
        await websocket.close()

@router.get("/test")
async def test():
    print("/test called.")

WebSocketとの接続を確立しているコードです。

app/services/open_interpreter_service.py

import os
import json
import base64
import traceback
import interpreter
from datetime import datetime
from .websocket_service import send_websocket_message

async def stream_open_interpreter(websocket):
    language = os.getenv('LANGUAGE') or "japanese"

    try:
        # 1日の記憶を保持する
        interpreter.conversation_filename = f"{datetime.now().strftime('%Y%m%d')}.json"
        interpreter.conversation_history_path = "./conversation_histories/"
        if os.path.exists(interpreter.conversation_history_path + interpreter.conversation_filename):
            # あったら読み込んで記憶として設定する
            with open(interpreter.conversation_history_path + interpreter.conversation_filename, "r") as f:
                interpreter.messages = json.load(f)
            print("Loaded conversation history.")
        else:
            # なかったら作成する
            with open(interpreter.conversation_history_path + interpreter.conversation_filename, "w") as f:
                json.dump([], f)
            print("Created conversation history.")

        interpreter.auto_run = True
        # interpreter.debug_mode = True
        interpreter.system_message = f"""
You can use the following libraries without installing:
- pandas
- numpy
- matplotlib
- seaborn
- scikit-learn
- pandas-datareader
- mplfinance
- yfinance
- requests
- scrapy
- beautifulsoup4
- opencv-python
- ffmpeg-python
- PyMuPDF
- pytube
- pyocr
- easyocr
- pydub
- pdfkit
- weasyprint
Your workspace is `./workspace` folder. If you make an output file, please put it in `./workspace/output`.
{'[[Please answer in Japanese. 日本語でお答えください。]]' if language == 'japanese' else ''}
        """

        message = ""
        saved_file = ""
        prev_type = ""

        while True:
            if message != "" and message != "\n":
                await send_websocket_message(websocket, message, prev_type)

            message = ""
            prev_type = ""

            # WebSocketでメッセージ受け取り待機
            print("Waiting for user message...")
            user_message = await websocket.receive_text()
            print(f"Received user message: {user_message}")

            parsed_data = json.loads(user_message)
            message_content = parsed_data.get("content")
            message_type = parsed_data.get("type")

            # WebSocketでテキストメッセージを受け取った場合
            # ex. {"type": "message", "text": "こんにちは"}
            if message_type == "chat" and message_content != "":
                if saved_file != "":
                    user_message = saved_file + user_message
                    saved_file = ""

                # OpenInterpreterの結果をstreamモードで取得、chunk毎に処理
                is_source_code = False
                print(message_content)
                for chunk in interpreter.chat(message_content, display=True, stream=True):
                    current_type = list(chunk.keys())[0]
                    if current_type != "language" and current_type != "active_line" and current_type != "end_of_execution":
                        # message typeの場合は、文節に区切ってメッセージを送信
                        if current_type != prev_type or (len(message) > 15 and message[-1] in ['。', '!', '?', ';', '…', ':'] or message[-1] == "\n"):
                            if message != "":
                                if "```" in message:
                                    # Toggle is_source_code
                                    is_source_code = not is_source_code
                                else:
                                    type_ = "code" if is_source_code else prev_type
                                    await send_websocket_message(websocket, message, type_)
                            message = ""

                        if current_type == "executing":
                            message += f"{chunk['executing']['code']}\n\n========================\nrunning...\n========================"
                        else:
                            message += chunk[current_type]
                        prev_type = current_type

            # WebSocketでファイルを受け取った場合
            # ex. {"type": "file", "fileName": "sample.txt", "fileData": "data:;base64,SGVsbG8sIHdvcmxkIQ=="}
            elif message_type == "file":
                # JSONデータをパースして、ファイル名とファイルデータを取得
                file_name = parsed_data.get("fileName")
                base64_data = parsed_data.get("fileData").split(",")[1]
                file_data = base64.b64decode(base64_data)

                # ファイルを保存するディレクトリを指定
                directory = "./workspace"

                # ディレクトリが存在しない場合、作成
                if not os.path.exists(directory):
                    os.makedirs(directory)

                # ファイルのフルパスを作成
                file_path = os.path.join(directory, file_name)

                # ファイルを保存
                with open(file_path, "wb") as f:
                    f.write(file_data)

                # メッセージを追加
                saved_file = f"{directory}/{file_name}にファイルを保存しました。" if language == 'japanese' else f"Saved file to {directory}/{file_name}."
                save_message = "ファイルを保存しました。" if language == 'japanese' else f"Saved file."
                await send_websocket_message(websocket, save_message, "assistant")

            # WebSocketで未設定のメッセージを受け取った場合
            else:
                error_message = "不正な送信が送られたようです。" if language == 'japanese' else "An invalid message was sent."
                await send_websocket_message(websocket, error_message, "assistant")

    except Exception as e:
        print("Errors:", e)
        traceback.print_exc()

        await websocket.close()
language = os.getenv('LANGUAGE') or "japanese"

Open Interpreterに渡す定型文などの言語を設定しています。と言っても今は `japanese` かそれ以外しかありません。

# WebSocketでメッセージ受け取り待機
print("Waiting for user message...")
user_message = await websocket.receive_text()
print(f"Received user message: {user_message}")

parsed_data = json.loads(user_message)
message_content = parsed_data.get("content")
message_type = parsed_data.get("type")

WebSocketの受け取り処理です。形式は下記のように設定しています。

{ type: String, content?: String, fileName?: String, fileData?: String }

type: "chat" or "file"
content: type = "chat" 時のテキストメッセージ、これがOpenInterpreterに渡される
fileName: type = "file" 時のファイル名
fileData:  type = "file" 時のファイルデータ(Base64形式でエンコードされている必要あり)

ファイルを受け渡す場合はBase64形式にしておく必要があります。

# OpenInterpreterの結果をstreamモードで取得、chunk毎に処理
is_source_code = False
for chunk in interpreter.chat(message_content, display=True, stream=True):
    current_type = list(chunk.keys())[0]
    if current_type != "language" and current_type != "active_line" and current_type != "end_of_execution":
        # message typeの場合は、文節に区切ってメッセージを送信
        if current_type != prev_type or (len(message) > 15 and message[-1] in ['。', '!', '?', ';', '…', ':'] or message[-1] == "\n"):
            if message != "":
                if "```" in message:
                    # Toggle is_source_code
                    is_source_code = not is_source_code
                else:
                    type_ = "code" if is_source_code else prev_type
                    await send_websocket_message(websocket, message, type_)
            message = ""

        if current_type == "executing":
            message += f"{chunk['executing']['code']}\n\n========================\nrunning...\n========================"
        else:
            message += chunk[current_type]
        prev_type = current_type

Open Interpreterをstreamモードで実行し、chunk毎に処理しています。
current_type(chunkのkey)が変わるか、句読点などの区切りのいいところで文節を切って、send_websocket_messageで接続先にレスポンスを送信しています。

微調整しているので、内容については適宜修正してください。
is_source_codeは稀にmessageにcode部分が含まれることがあるため、それを判定するために使用しています。

elif message_type == "file":
    # JSONデータをパースして、ファイル名とファイルデータを取得
    file_name = parsed_data.get("fileName")
    base64_data = parsed_data.get("fileData").split(",")[1]
    file_data = base64.b64decode(base64_data)

    # ファイルを保存するディレクトリを指定
    directory = "./workspace"

    # ディレクトリが存在しない場合、作成
    if not os.path.exists(directory):
        os.makedirs(directory)

    # ファイルのフルパスを作成
    file_path = os.path.join(directory, file_name)

    # ファイルを保存
    with open(file_path, "wb") as f:
        f.write(file_data)

    # メッセージを追加
    saved_file = f"{directory}/{file_name}にファイルを保存しました。" if language == 'japanese' else f"Saved file to {directory}/{file_name}."
    save_message = "ファイルを保存しました。" if language == 'japanese' else f"Saved file."
    await send_websocket_message(websocket, save_message, "assistant")

WebSocketで受け取ったファイルをworkspaceフォルダに保存しています。
この処理では特にOpen Interpreterを使っているわけではないですが、アップロード後にOpen Interpreterに「workspace内の〇〇.jsonを〜」のような質問をすれば対応してくれます。

残りはそこまで重要でないので割愛します。

app/services/websocket_service.py

import json
import base64

async def send_websocket_message(websocket, message, type):
    # type別にrole変数に値を設定
    role = "assistant" if type == "message" else type

    if websocket and message != "":
        json_data = json.dumps({"role": role, "text": message}, ensure_ascii=False)
        print(f"Sending message: {json_data}")
        await websocket.send({"type": "websocket.send", "text": json_data})
        print(f"Send cmplete.")
    else:
        print("Can't send message, WebSocket connection is closed.")

接続先に送信する処理です。形式は下記のようにしていますが、適宜調整してください。

{ role: String, text: String }

おまけ

下記からこのシステムを使用した実用例が見れます。チャンネル登録してもらえると嬉しいです🙇‍♀

フロントはReactとTypeScriptを使用しています。

ベースにはpixiv社のChatVRMを使用させてもらっています。


この記事が気に入ったらサポートをしてみませんか?