LangChainとFastAPIのストリーミング機能を使って、ChatGPTで生成したテキストをAPIでリアルタイム送信する方法
こんな動作をするAPIの話。
LangChainの以下のIssueでも取り上げられていたのでメモです。
ソースコード
import threading
import queue
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.base import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.schema import (
HumanMessage,
SystemMessage
)
app = FastAPI(
title="LangChain API",
)
class ThreadedGenerator:
def __init__(self):
self.queue = queue.Queue()
def __iter__(self):
return self
def __next__(self):
item = self.queue.get()
if item is StopIteration:
raise item
return item
def send(self, data):
self.queue.put(data)
def close(self):
self.queue.put(StopIteration)
class ChainStreamHandler(StreamingStdOutCallbackHandler):
def __init__(self, gen):
super().__init__()
self.gen = gen
def on_llm_new_token(self, token: str, **kwargs):
self.gen.send(token)
def llm_thread(g, prompt):
try:
chat = ChatOpenAI(
verbose=True,
streaming=True,
callback_manager=CallbackManager([ChainStreamHandler(g)]),
temperature=0.7,
)
chat([SystemMessage(content="You are a poetic assistant"), HumanMessage(content=prompt)])
finally:
g.close()
def chat(prompt):
g = ThreadedGenerator()
threading.Thread(target=llm_thread, args=(g, prompt)).start()
return g
@app.get("/question-stream")
async def stream():
return StreamingResponse(
chat("Write me a song about sparkling water."),
media_type='text/event-stream'
)
def start():
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
if __name__ == "__main__":
start()
動かし方
事前にライブラリをインストール。
pip install uvicorn fastapi openai langchain
で、動かす。
# サーバー起動
python main.py
# curlで叩く
curl -N -H "Accept: text/event-stream" http://0.0.0.0:8000/question-stream
FastAPIのサクッと感は素晴らしい。
現場からは以上です。
この記事が気に入ったらサポートをしてみませんか?