因为大模型相当于一个大型的预测机器。所以吐字是一个一个吐的。这种像水流一样的方式,专业来说就是服务器单向向前端持续推送消息的机制,就叫做SSE。

我会用py + fastapi+ react 去简单构建一个小demo。

pip install fastapi uvicorn langchain langchain-openai

from fastapi import FastApi
from fastapi.responses import StreamingResponse
from pydanic import BaseModel // fastapi处理前端的参数之后,会解析成这个类。
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

调用大模型的部分

app = FastApi()

tasks={}

class ChatRequest(BaseModel):
	question:str

prompt = ChatPromptTemplate.from_messages([
(
	"system",
	"你是一个通用的智能助手。"
  "请认真理解用户问题,并用准确、清晰、简洁的方式回答。"
 	"如果你不确定答案,请明确说明,不要编造。"
),
("user","{question}")
])

llm = ChatOpenAI(
	model="gpt-4o-mini",
	temperature=0.2,
	streaming=True
)

chain = prompt | llm
``

接口部分

```python
@app.post("/chat")
def create_chat(req:ChatRequest):
	question = req.question
	task_id = str(uuid.uuid4())

	tasks[task_id] = {
		"question":question,
		"status":"pending"
	}
	return {
		"task_id":task_id
	}

@app.get("/chat/stream/{task_id}")
def stream_chat(task_id:str):
	def event_generator():
		task = tasks.get(task_id)
		if not task:
			yield: f"data:[DONE]\n\n"
		question = task.get("question")
		task["status"] = "running"

		for chunk in chain.stream({"question":question}):
			if hasattr(chunk,"content") and chunk.content:
			yield f"data:{chunk.content}\n\n"
		
		yield f"data: [DONE]\n\n"
	return streamingResponse(event_generator,media_type:"text/event-stream")

前端代码我这边写一些关键的

// 第一次请求先拿到task_id

const sendMessage = async() =>{
	const res = await fetch(url,
		{
			method:"POST",
			body:JOSN.stringify({
				question:qus
			})
		}
	)
	const taskId = res.task_id;
	// 然后用eventSource去监听
	const eventSource = new 		EventSource(`${url}/${taskId}`)

	eventSource.onmessage=(event)=>{
		if(event.data === "[DONE]"){
			eventSource.close();
			return
		}
		
		//event.data 去拼出来就行了
	}
}
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐