【コード解説】美少女OPInterpreter簡易版
ソースコードはこちらから。
使い方
できること
WebSocketでメッセージを受け取って、Open Interpreterからレスポンスを取得することができます(streamモード対応)。
ファイルを送信してサーバー側に保存することができます。このファイルに対してOpen Interpreterに指示を出すことも可能です。
事前準備
本リポジトリはWebSocketでの起動を前提としているため、ご自身の環境に合わせて接続先を準備してください。
接続URLはデフォルトで`ws://127.0.0.1:8000/ws`です。
実行方法
`.env`にOPENAI_API_KEYを設定
`docker-compose up -d --build` 実行
デバッグ
実行環境がVSCodeのときに、DEBUG_MODE=1でデバッグモードを起動します。
ブレークポイントを設置して快適にデバッグしましょう。
VSCodeのデバッグは下記のからあげさんの記事が参考になると思います。
その他
ライセンスはKillianLucas/open-interpreterに準拠します。
Open Interpreterの使用方法は下記にまとめています。
コード解説
pythonのお作法を正しく理解していないので、稚拙なコードはご容赦ください。
app/app.py
from fastapi import FastAPI
from app.routers import base
import ptvsd
import os
if os.getenv('DEBUG_MODE') == "1":
# デバッグ用コード
ptvsd.enable_attach(address=('0.0.0.0', 5678), redirect_output=True)
ptvsd.wait_for_attach()
app = FastAPI()
app.include_router(base.router)
FastAPIの起動点です。特別なことはしていません。
app/routers/base.py
from fastapi import APIRouter, WebSocket
from ..services.open_interpreter_service import stream_open_interpreter
import asyncio
router = APIRouter()
async def main(websocket):
task1 = asyncio.create_task(stream_open_interpreter(websocket))
await asyncio.gather(task1)
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
await main(websocket)
except Exception as e:
print(f"WebSocket Endpoint Error: {e}")
await websocket.close()
@router.get("/test")
async def test():
print("/test called.")
WebSocketとの接続を確立しているコードです。
app/services/open_interpreter_service.py
import os
import json
import base64
import traceback
import interpreter
from datetime import datetime
from .websocket_service import send_websocket_message
async def stream_open_interpreter(websocket):
language = os.getenv('LANGUAGE') or "japanese"
try:
# 1日の記憶を保持する
interpreter.conversation_filename = f"{datetime.now().strftime('%Y%m%d')}.json"
interpreter.conversation_history_path = "./conversation_histories/"
if os.path.exists(interpreter.conversation_history_path + interpreter.conversation_filename):
# あったら読み込んで記憶として設定する
with open(interpreter.conversation_history_path + interpreter.conversation_filename, "r") as f:
interpreter.messages = json.load(f)
print("Loaded conversation history.")
else:
# なかったら作成する
with open(interpreter.conversation_history_path + interpreter.conversation_filename, "w") as f:
json.dump([], f)
print("Created conversation history.")
interpreter.auto_run = True
# interpreter.debug_mode = True
interpreter.system_message = f"""
You can use the following libraries without installing:
- pandas
- numpy
- matplotlib
- seaborn
- scikit-learn
- pandas-datareader
- mplfinance
- yfinance
- requests
- scrapy
- beautifulsoup4
- opencv-python
- ffmpeg-python
- PyMuPDF
- pytube
- pyocr
- easyocr
- pydub
- pdfkit
- weasyprint
Your workspace is `./workspace` folder. If you make an output file, please put it in `./workspace/output`.
{'[[Please answer in Japanese. 日本語でお答えください。]]' if language == 'japanese' else ''}
"""
message = ""
saved_file = ""
prev_type = ""
while True:
if message != "" and message != "\n":
await send_websocket_message(websocket, message, prev_type)
message = ""
prev_type = ""
# WebSocketでメッセージ受け取り待機
print("Waiting for user message...")
user_message = await websocket.receive_text()
print(f"Received user message: {user_message}")
parsed_data = json.loads(user_message)
message_content = parsed_data.get("content")
message_type = parsed_data.get("type")
# WebSocketでテキストメッセージを受け取った場合
# ex. {"type": "message", "text": "こんにちは"}
if message_type == "chat" and message_content != "":
if saved_file != "":
user_message = saved_file + user_message
saved_file = ""
# OpenInterpreterの結果をstreamモードで取得、chunk毎に処理
is_source_code = False
print(message_content)
for chunk in interpreter.chat(message_content, display=True, stream=True):
current_type = list(chunk.keys())[0]
if current_type != "language" and current_type != "active_line" and current_type != "end_of_execution":
# message typeの場合は、文節に区切ってメッセージを送信
if current_type != prev_type or (len(message) > 15 and message[-1] in ['。', '!', '?', ';', '…', ':'] or message[-1] == "\n"):
if message != "":
if "```" in message:
# Toggle is_source_code
is_source_code = not is_source_code
else:
type_ = "code" if is_source_code else prev_type
await send_websocket_message(websocket, message, type_)
message = ""
if current_type == "executing":
message += f"{chunk['executing']['code']}\n\n========================\nrunning...\n========================"
else:
message += chunk[current_type]
prev_type = current_type
# WebSocketでファイルを受け取った場合
# ex. {"type": "file", "fileName": "sample.txt", "fileData": "data:;base64,SGVsbG8sIHdvcmxkIQ=="}
elif message_type == "file":
# JSONデータをパースして、ファイル名とファイルデータを取得
file_name = parsed_data.get("fileName")
base64_data = parsed_data.get("fileData").split(",")[1]
file_data = base64.b64decode(base64_data)
# ファイルを保存するディレクトリを指定
directory = "./workspace"
# ディレクトリが存在しない場合、作成
if not os.path.exists(directory):
os.makedirs(directory)
# ファイルのフルパスを作成
file_path = os.path.join(directory, file_name)
# ファイルを保存
with open(file_path, "wb") as f:
f.write(file_data)
# メッセージを追加
saved_file = f"{directory}/{file_name}にファイルを保存しました。" if language == 'japanese' else f"Saved file to {directory}/{file_name}."
save_message = "ファイルを保存しました。" if language == 'japanese' else f"Saved file."
await send_websocket_message(websocket, save_message, "assistant")
# WebSocketで未設定のメッセージを受け取った場合
else:
error_message = "不正な送信が送られたようです。" if language == 'japanese' else "An invalid message was sent."
await send_websocket_message(websocket, error_message, "assistant")
except Exception as e:
print("Errors:", e)
traceback.print_exc()
await websocket.close()
language = os.getenv('LANGUAGE') or "japanese"
Open Interpreterに渡す定型文などの言語を設定しています。と言っても今は `japanese` かそれ以外しかありません。
# WebSocketでメッセージ受け取り待機
print("Waiting for user message...")
user_message = await websocket.receive_text()
print(f"Received user message: {user_message}")
parsed_data = json.loads(user_message)
message_content = parsed_data.get("content")
message_type = parsed_data.get("type")
WebSocketの受け取り処理です。形式は下記のように設定しています。
{ type: String, content?: String, fileName?: String, fileData?: String }
type: "chat" or "file"
content: type = "chat" 時のテキストメッセージ、これがOpenInterpreterに渡される
fileName: type = "file" 時のファイル名
fileData: type = "file" 時のファイルデータ(Base64形式でエンコードされている必要あり)
ファイルを受け渡す場合はBase64形式にしておく必要があります。
# OpenInterpreterの結果をstreamモードで取得、chunk毎に処理
is_source_code = False
for chunk in interpreter.chat(message_content, display=True, stream=True):
current_type = list(chunk.keys())[0]
if current_type != "language" and current_type != "active_line" and current_type != "end_of_execution":
# message typeの場合は、文節に区切ってメッセージを送信
if current_type != prev_type or (len(message) > 15 and message[-1] in ['。', '!', '?', ';', '…', ':'] or message[-1] == "\n"):
if message != "":
if "```" in message:
# Toggle is_source_code
is_source_code = not is_source_code
else:
type_ = "code" if is_source_code else prev_type
await send_websocket_message(websocket, message, type_)
message = ""
if current_type == "executing":
message += f"{chunk['executing']['code']}\n\n========================\nrunning...\n========================"
else:
message += chunk[current_type]
prev_type = current_type
Open Interpreterをstreamモードで実行し、chunk毎に処理しています。
current_type(chunkのkey)が変わるか、句読点などの区切りのいいところで文節を切って、send_websocket_messageで接続先にレスポンスを送信しています。
微調整しているので、内容については適宜修正してください。
is_source_codeは稀にmessageにcode部分が含まれることがあるため、それを判定するために使用しています。
elif message_type == "file":
# JSONデータをパースして、ファイル名とファイルデータを取得
file_name = parsed_data.get("fileName")
base64_data = parsed_data.get("fileData").split(",")[1]
file_data = base64.b64decode(base64_data)
# ファイルを保存するディレクトリを指定
directory = "./workspace"
# ディレクトリが存在しない場合、作成
if not os.path.exists(directory):
os.makedirs(directory)
# ファイルのフルパスを作成
file_path = os.path.join(directory, file_name)
# ファイルを保存
with open(file_path, "wb") as f:
f.write(file_data)
# メッセージを追加
saved_file = f"{directory}/{file_name}にファイルを保存しました。" if language == 'japanese' else f"Saved file to {directory}/{file_name}."
save_message = "ファイルを保存しました。" if language == 'japanese' else f"Saved file."
await send_websocket_message(websocket, save_message, "assistant")
WebSocketで受け取ったファイルをworkspaceフォルダに保存しています。
この処理では特にOpen Interpreterを使っているわけではないですが、アップロード後にOpen Interpreterに「workspace内の〇〇.jsonを〜」のような質問をすれば対応してくれます。
残りはそこまで重要でないので割愛します。
app/services/websocket_service.py
import json
import base64
async def send_websocket_message(websocket, message, type):
# type別にrole変数に値を設定
role = "assistant" if type == "message" else type
if websocket and message != "":
json_data = json.dumps({"role": role, "text": message}, ensure_ascii=False)
print(f"Sending message: {json_data}")
await websocket.send({"type": "websocket.send", "text": json_data})
print(f"Send cmplete.")
else:
print("Can't send message, WebSocket connection is closed.")
接続先に送信する処理です。形式は下記のようにしていますが、適宜調整してください。
{ role: String, text: String }
おまけ
下記からこのシステムを使用した実用例が見れます。チャンネル登録してもらえると嬉しいです🙇♀
フロントはReactとTypeScriptを使用しています。
ベースにはpixiv社のChatVRMを使用させてもらっています。
この記事が気に入ったらサポートをしてみませんか?