从 0 到 1 搭建可商用的 AI 聊天机器人:300 行代码搞定 RAG + 流式输出
从 0 到 1 搭建可商用的 AI 聊天机器人:300 行代码搞定 RAG + 流式输出
摘要:拒绝“Demo 一时爽,上线火葬场”。本文摒弃冗长的理论,直接提供一套基于 LangChain + Chroma + FastAPI 的生产级 RAG 解决方案。包含完整的上下文管理、流式输出以及商用级别的限流与日志处理,核心代码仅需 300 行。
一、痛点引入:为什么 90% 的 RAG 项目都跑不起来?
在当前的 AI 落地浪潮中,许多开发者在搭建 RAG(检索增强生成)系统时会陷入以下困境:
- 教程脱离实际:网上的教程大多停留在 Jupyter Notebook 里的玩具代码,一旦放入生产环境,立刻面临并发崩溃、内存泄漏等问题。
- 上下文管理混乱:多轮对话时,历史消息要么丢失,要么无限膨胀导致 Token 超限和成本失控。
- 流式输出卡顿:前端体验极差,要么一次性全量返回,要么流式输出时断时续,缺乏标准的 SSE (Server-Sent Events) 实现。
- 缺乏商用兜底:没有接口限流、没有请求日志、没有优雅的错误降级,直接被恶意请求打穿。
今天,我们将用不到 300 行核心 Python 代码,一次性解决上述所有痛点,交付一个可直接部署商用的 AI 聊天机器人后端。
二、技术选型
为了保证轻量、高效且易于维护,我们采用以下技术栈:
- 编排框架:LangChain(使用最新的 LCEL 语法,性能更好,逻辑更清晰)
- 向量数据库:ChromaDB(轻量级,支持本地持久化,无需额外部署重型数据库)
- Web 框架:FastAPI(原生支持异步和 SSE 流式输出,性能极佳)
- 大模型 API:OpenAI API(如 gpt-4o-mini,兼顾速度与成本)
国内模型平替方案:只需修改
base_url和model参数,即可无缝切换至 DeepSeek (deepseek-chat) 或 阿里云通义千问 (qwen-plus),成本可降低 80% 以上。
三、分步实现(核心代码)
0. 环境准备
pip install "langchain>=0.3.0" langchain-community langchain-openai chromadb fastapi uvicorn python-dotenv slowapi
1. 文档加载与分块 + 向量数据库初始化
我们将文档处理与向量库初始化封装为独立函数,支持增量更新。
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
load_dotenv()
# 支持国内模型平替:只需修改 OPENAI_API_BASE 和 OPENAI_MODEL_NAME
os.environ.setdefault("OPENAI_API_BASE", "https://api.openai.com/v1")
os.environ.setdefault("OPENAI_MODEL_NAME", "gpt-4o-mini")
def init_vectorstore(docs_path: str = "data.txt"):
"""初始化或加载向量数据库"""
embeddings = OpenAIEmbeddings()
if os.path.exists("./chroma_db"):
print("加载已有向量数据库...")
return Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
print("正在加载并切分文档...")
loader = TextLoader(docs_path, encoding="utf-8")
documents = loader.load()
# 按 500 token 切分,重叠 50 token,保证上下文连贯性
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
texts = text_splitter.split_documents(documents)
print("正在构建向量索引...")
vectorstore = Chroma.from_documents(texts, embeddings, persist_directory="./chroma_db")
return vectorstore
2. 基础检索链构建 + 上下文记忆管理
使用 LangChain 的 RunnableWithMessageHistory,优雅解决多轮对话的上下文记忆问题。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
def build_rag_chain(vectorstore):
"""构建带历史记忆的 RAG 链"""
llm = ChatOpenAI(temperature=0.1, streaming=True) # 开启流式支持
retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) # 每次检索 Top 3
# 1. 上下文感知检索器:将用户当前问题结合历史对话进行重写,提升检索准确率
contextualize_q_prompt = ChatPromptTemplate.from_messages([
("system", "根据以下聊天历史,将用户的最新问题重写为一个独立的、可检索的问题。如果没有历史,直接返回原问题。"),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt)
# 2. QA 提示词:结合检索到的文档和历史进行回答
qa_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的 AI 助手。请仅根据以下提供的上下文回答问题。\n\n上下文:\n{context}"),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
# 3. 组合成完整的 RAG 链
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
# 4. 包装为带记忆的版本 (通过 session_id 隔离不同用户的对话)
store = {}
def get_session_history(session_id: str):
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
return RunnableWithMessageHistory(
rag_chain,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
3. 流式输出实现 (FastAPI)
利用 FastAPI 的 StreamingResponse 和 LangChain 的 astream,实现丝滑的打字机效果。
import uuid
import json
import logging
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# 初始化 FastAPI 与 限流器
app = FastAPI(title="商用 RAG Chatbot API")
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# 初始化全局组件
vectorstore = init_vectorstore()
rag_chain = build_rag_chain(vectorstore)
class ChatRequest(BaseModel):
message: str
session_id: str | None = None # 前端传入,用于维持多轮对话
@app.post("/api/chat/stream")
@limiter.limit("10/minute") # 商用限流:单 IP 每分钟最多 10 次请求
async def chat_stream(request: ChatRequest):
session_id = request.session_id or str(uuid.uuid4())
# 配置请求日志
logging.info(f"[{session_id}] 收到请求: {request.message}")
try:
async def generate():
# 使用 astream_events 获取更细粒度的流式控制
async for event in rag_chain.astream_events(
{"input": request.message},
config={"configurable": {"session_id": session_id}},
version="v2"
):
# 仅拦截 LLM 生成的 token 流
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
yield f"data: {json.dumps({'token': token}, ensure_ascii=False)}\n\n"
# 流结束标志
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
except Exception as e:
logging.error(f"[{session_id}] 处理失败: {str(e)}")
raise HTTPException(status_code=500, detail="服务器内部错误,请稍后重试")
四、商用优化:限流、日志与错误处理
上述代码中已经内置了商用必备的三大防护机制:
- 接口限流:引入 slowapi,通过
@limiter.limit("10/minute")防止单 IP 恶意刷接口耗尽 Token 额度。生产环境中建议将key_func替换为基于 Redis 的分布式限流。 - 结构化日志:使用
logging记录session_id和用户输入,方便后续通过 ELK 或 Loki 进行链路追踪和审计。 - 优雅降级:全局
try-except捕获异常,避免将底层的 LangChain 堆栈信息暴露给前端,统一返回标准的 HTTP 500 JSON 响应。
五、部署指南:Docker 一键部署到服务器
为了方便运维,我们提供标准的 Docker 部署方案。
1. 创建 requirements.txt
fastapi
uvicorn
langchain>=0.3.0
langchain-community
langchain-openai
chromadb
python-dotenv
slowapi
2. 创建 Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
3. 创建 docker-compose.yml
version: '3.8'
services:
rag-bot:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=your_api_key_here
- OPENAI_API_BASE=https://api.openai.com/v1
volumes:
- ./chroma_db:/app/chroma_db
- ./data.txt:/app/data.txt
restart: always
4. 一键启动
docker-compose up -d --build
六、效果演示与完整代码仓库
前端可以通过标准的 EventSource 或 fetch API 消费该流式接口:
const eventSource = new EventSource('/api/chat/stream?message=你好&session_id=123');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
console.log(data.token); // 追加到前端 UI 即可实现打字机效果
};
完整代码仓库已开源,包含前端测试页面与完整的错误处理逻辑。
GitHub: github.com/your-repo/commercial-rag-bot(示例链接)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)