SSE流式使用
·
因为大模型相当于一个大型的预测机器。所以吐字是一个一个吐的。这种像水流一样的方式,专业来说就是服务器单向向前端持续推送消息的机制,就叫做SSE。
我会用py + fastapi+ react 去简单构建一个小demo。
pip install fastapi uvicorn langchain langchain-openai
from fastapi import FastApi
from fastapi.responses import StreamingResponse
from pydanic import BaseModel // fastapi处理前端的参数之后,会解析成这个类。
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
调用大模型的部分
app = FastApi()
tasks={}
class ChatRequest(BaseModel):
question:str
prompt = ChatPromptTemplate.from_messages([
(
"system",
"你是一个通用的智能助手。"
"请认真理解用户问题,并用准确、清晰、简洁的方式回答。"
"如果你不确定答案,请明确说明,不要编造。"
),
("user","{question}")
])
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.2,
streaming=True
)
chain = prompt | llm
``
接口部分
```python
@app.post("/chat")
def create_chat(req:ChatRequest):
question = req.question
task_id = str(uuid.uuid4())
tasks[task_id] = {
"question":question,
"status":"pending"
}
return {
"task_id":task_id
}
@app.get("/chat/stream/{task_id}")
def stream_chat(task_id:str):
def event_generator():
task = tasks.get(task_id)
if not task:
yield: f"data:[DONE]\n\n"
question = task.get("question")
task["status"] = "running"
for chunk in chain.stream({"question":question}):
if hasattr(chunk,"content") and chunk.content:
yield f"data:{chunk.content}\n\n"
yield f"data: [DONE]\n\n"
return streamingResponse(event_generator,media_type:"text/event-stream")
前端代码我这边写一些关键的
// 第一次请求先拿到task_id
const sendMessage = async() =>{
const res = await fetch(url,
{
method:"POST",
body:JOSN.stringify({
question:qus
})
}
)
const taskId = res.task_id;
// 然后用eventSource去监听
const eventSource = new EventSource(`${url}/${taskId}`)
eventSource.onmessage=(event)=>{
if(event.data === "[DONE]"){
eventSource.close();
return
}
//event.data 去拼出来就行了
}
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)