目标:让你能在真实业务中设计、部署、运维一个生产可用的 RAG 系统。


一、生产级 RAG vs Demo 级 RAG 的核心差异

维度

Demo 级

生产级

数据量

几十~几百条

几十万~上亿条

延迟要求

慢点无所谓

P99 < 1s

知识更新

重新跑脚本

热更新,增量索引

多租户

不考虑

隔离、安全

可用性

跑通就行

SLA 99.9%,容灾

成本控制

不考虑

Token 成本是生命线

监控

全链路可观测


二、性能优化 — 延迟从 5s 压到 500ms

2.1 瓶颈定位:先找真正的慢在哪里

# 生产级打点:每个环节单独计时
import time
from functools import wraps

def timed(name):
 def decorator(func):
 @wraps(func)
 def wrapper(*args, **kwargs):
 t0 = time.perf_counter()
 result = func(*args, **kwargs)
 elapsed = (time.perf_counter() - t0) * 1000
 logger.info(f"[TIMING] {name}: {elapsed:.1f}ms")
 return result
 return wrapper
 return decorator

class ProductionRAG:
 @timed("embed_question")
 def embed_question(self, question: str) -> np.ndarray:
 return self.embedder.embed(question)

 @timed("vector_search")
 def vector_search(self, vec: np.ndarray, top_k: int) -> list:
 return self.vector_store.search(vec, top_k)

 @timed("rerank")
 def rerank(self, query_vec: np.ndarray, candidates: list, top_k: int) -> list:
 return self.reranker.rerank(query_vec, candidates, top_k)

 @timed("llm_generate")
 def llm_generate(self, prompt: str) -> str:
 resp = self.llm.chat.completions.create(
 model="deepseek-chat",
 messages=[{"role": "user", "content": prompt}],
 temperature=0.3,
 )
 return resp.choices[0].message.content or ""

 def ask(self, question: str) -> dict:
 t0 = time.perf_counter()
 q_vec = self.embed_question(question)
 candidates = self.vector_search(q_vec, top_k=20)
 reranked = self.rerank(q_vec, candidates, top_k=3)
 answer = self.llm_generate(self.build_prompt(question, reranked))
 total = (time.perf_counter() - t0) * 1000
 logger.info(f"[TOTAL] {total:.1f}ms")
 return {"answer": answer, "total_ms": total}

Java 类比:Spring Boot 的 @Timed + Micrometer,监控每个 Bean 方法的 P50/P90/P99。

2.2 各环节优化策略

① Embedding 层优化

问题:每次请求都要把问题向量化,是固定成本。

方案1:Embedding 缓存

from functools import lru_cache

class CachedEmbedder:
 def __init__(self, base_embedder, cache_size=10000):
 self.base = base_embedder
 self.cache = {} # LRU: question → vec
 self.access_order = [] # 手动 LRU

 def embed(self, text: str) -> np.ndarray:
 if text in self.cache:
 # 移到最前
 self.access_order.remove(text)
 self.access_order.insert(0, text)
 return self.cache[text]

 vec = self.base.embed(text)
 if len(self.cache) >= 10000:
 # 淘汰最久未用的
 oldest = self.access_order.pop()
 del self.cache[oldest]
 self.cache[text] = vec
 self.access_order.insert(0, text)
 return vec

缓存命中率实测

  • 客服场景(重复问题多):命中率 40~60%,延迟从 50ms → 5ms
  • 文档问答(问题分散):命中率 10~20%,但对热点问题有效

方案2:批量Embedding(Ingest优化)

# Demo 写法:逐条向量化,100条 = 100次网络往返
for chunk in chunks:
 vec = embedder.embed(chunk.text) # N次HTTP请求 ❌

# 生产写法:批量一次发
batch_vecs = embedder.encode(chunks_texts) # 1次,吞吐高10倍 ✅

Java 类比:数据库 N+1 问题 vs 批量 IN (id1, id2, ...) 查询。

② 向量检索层优化

问题:10万条以上数据,暴力检索太慢。

三层索引策略

数据量层级 推荐方案 延迟 内存
────────────────────────────────────────────────────────
< 1万 暴力检索(numpy) 50ms 小
1万~100万 HNSW (efConstruction=200) 10ms 中
100万~1亿 HNSW + IVF 聚类 20ms 大
> 1亿 分布式向量引擎(Milvus/Qdrant) 可控 集群

HNSW 参数调优

import chromadb

client = chromadb.Client()

# HNSW 参数详解
collection = client.create_collection(
 name="production_rag",
 metadata={
 "hnsw:space": "cosine", # 余弦相似度
 "hnsw:M": 16, # 构建时每层连接数,大=精度高=慢
 "hnsw:efConstruction": 200, # 构建精度,200=高质量,400=极高精度(慢)
 "hnsw:ef": 100, # 搜索时的探索范围,大=精度高=慢
 }
)

参数

小值(快/低精度)

大值(慢/高精度)

生产建议

M

8

64

16~32

efConstruction

100

400

200

ef (搜索时)

50

500

100

ef 动态调整:搜索用 ef=100,精确场景用 ef=300

③ LLM 生成层优化

延迟的绝对大户:Embedding 10ms + 检索 20ms + Rerank 10ms = 40ms,但 LLM 生成可能占 2~5s。

方案1:Streaming 输出

python复制

# Demo 写法:等完整回答再返回
answer = llm.generate(prompt) # 等 3s 才有输出 ❌

# 生产写法:流式,边生成边返回
stream = llm.chat.completions.create(
 model="deepseek-chat",
 messages=[{"role": "user", "content": prompt}],
 stream=True,
)
for chunk in stream:
 yield chunk.choices[0].delta.content # 200ms 开始输出 ✅

方案2:Prompt 瘦身

# 问题:Prompt 太长 → Token 多 → 费用高 + 生成慢
bad_prompt = f"""
你是一个专业的技术问答助手。请仔细阅读以下所有文档内容,
然后回答用户的问题。以下文档可能包含或不含答案,请仔细分析...

[文档1: 500字] ...
[文档2: 500字] ...
[文档3: 500字] ...
...
"""

# 解决:只送最相关的,控制 Token 数量
good_prompt = f"""
基于以下内容回答,简明扼要,不超过200字:

{docs[0].snippet(300)}
""".strip()

经验:Prompt 从 2000 tokens 压到 500 tokens,LLM 延迟从 3s → 1s,费用降 75%。

方案3:模型降级

def ask_with_fallback(question):
 try:
 # 优先用好模型
 return fast_llm.answer(question) # GPT-4o, ~500ms, $0.01/次
 except TimeoutError:
 # 降级到便宜模型
 return cheap_llm.answer(question) # GPT-3.5, ~200ms, $0.001/次

三、成本控制 — Token 是钱,LLM 调用要抠

3.1 成本分解

一次 RAG 请求的成本:

Embedding: 1次 × 问题Token数 × $0.0001/1K token
LLM 输入: 1次 × (上下文Token + 问题Token) × $0.001/1K token
LLM 输出: 1次 × 回答Token数 × $0.002/1K token
────────────────────────────────────────────────────────
总计 ≈ $0.003~0.01 / 次(假设 2000 token 上下文)

10万次请求/月 ≈ $300~1000/月

3.2 成本优化三招

第一招: Embedding 模型从 API 换成本地

python复制

# API 调用(花钱)
from openai import OpenAI
client = OpenAI()
vec = client.embeddings.create(
 model="text-embedding-3-small",
 input="问题文本"
) # $0.0001/1K token

# 本地模型(一次性投入)
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("BAAI/bge-small-zh-v1.5")
vec = model.encode("问题文本") # 免费,只是 CPU/GPU 计算

每月 10 万次请求 → 从 $10 降到 $0。

第二招:上下文压缩
# 原始 Rerank 结果:3个文档,各500字 = 1500字塞进 Prompt

# 优化:先让 LLM 摘要每个文档,只送核心内容
summarized = []
for doc in reranked_docs:
 summary = llm.generate(f"用一句话概括:{doc.text[:500]}")
 summarized.append(summary) # 50字 vs 500字

prompt = f"相关要点:{';'.join(summarized)}\n问题:{question}"

Token 成本从 1500 → 300,省 80%。

第三招:缓存 + 旁路
from cachetools import TTLCache

class CostAwareRAG:
 def __init__(self, llm):
 self.llm = llm
 self.prompt_cache = TTLCache(maxsize=10000, ttl=3600) # 1小时过期

 def ask(self, question: str) -> str:
 # LLM 结果缓存(问题相同 → 直接返回)
 cache_key = hash(question)
 if cache_key in self.prompt_cache:
 return self.prompt_cache[cache_key]

 answer = self._do_ask(question)
 self.prompt_cache[cache_key] = answer
 return answer

适用于:FAQ 类场景(问题重复率高),命中率 20~40% 时直接省 40% LLM 调用。


四、知识库热更新 — 不用重启更新文档

4.1 增量更新 vs 全量重建

Demo 级:删库 → 重新跑 ingest.py → 重启服务 ❌

生产级:
 - 增量入库:新文档 append,不影响查询
 - 热更新:修改/删除已有文档,不影响服务
 - 灰度:先更新 5% 节点,验证后再全量

4.2 ChromaDB 的 CRUD 操作

import chromadb

client = chromadb.Client()
collection = client.get_collection("knowledge_base")

# ── 增量新增 ──
collection.add(
 ids=["doc_001", "doc_002"], # 唯一ID,重复ID=覆盖
 embeddings=[vec1, vec2],
 documents=["内容1", "内容2"],
 metadatas=[{"source": "产品文档"}, {"source": "FAQ"}],
)

# ── 查询已有 ──
existing = collection.get(ids=["doc_001"])
print(existing["documents"]) # ['内容1']

# ── 更新(如文档修改)─
collection.update(
 ids=["doc_001"],
 documents=["内容1(更新版本)"],
 embeddings=[new_vec1], # 内容变了,向量也要更新
)

# ── 删除 ──
collection.delete(ids=["doc_001"])

# ── 按条件查 ──
filtered = collection.get(
 where={"source": {"$eq": "FAQ"}} # 元数据过滤
)

4.3 版本管理与回滚

import json
from datetime import datetime

class VersionedKnowledgeBase:
 """知识库版本管理,支持回滚"""

 def __init__(self, collection, backup_dir="backups"):
 self.collection = collection
 self.backup_dir = Path(backup_dir)
 self.backup_dir.mkdir(exist_ok=True)
 self.current_version = None
 self.versions = [] # [(version_id, timestamp, count), ...]

 def ingest_with_version(self, chunks: list[dict], version_tag: str):
 """入库并打版本快照"""
 # 1. 备份当前状态
 snapshot = self._snapshot()
 backup_path = self.backup_dir / f"{version_tag}_{datetime.now().strftime('%Y%m%d%H%M%S')}.json"
 with open(backup_path, "w", encoding="utf-8") as f:
 json.dump(snapshot, f, ensure_ascii=False)
 self.versions.append((version_tag, len(chunks)))

 # 2. 增量入库
 for chunk in chunks:
 self.collection.add(**chunk)

 self.current_version = version_tag
 print(f"[VERSION] {version_tag} 入库完成,共 {len(chunks)} 条,快照已保存")

 def rollback(self, version_tag: str):
 """回滚到指定版本"""
 # 找到目标版本快照
 backup_files = list(self.backup_dir.glob(f"{version_tag}_*.json"))
 if not backup_files:
 print(f"[ERROR] 未找到版本 {version_tag} 的备份")
 return

 latest = max(backup_files, key=lambda p: p.stat().st_mtime)
 with open(latest, "r", encoding="utf-8") as f:
 snapshot = json.load(f)

 # 删除当前所有文档,重新注入快照
 self.collection.delete(where={}) # 清空
 for doc in snapshot["documents"]:
 self.collection.add(**doc)

 print(f"[ROLLBACK] 已回滚到 {version_tag},共 {len(snapshot['documents'])} 条")

Java 类比:数据库的 INSERT / UPDATE / DELETE 加 Flyway Migration 版本管理,生产环境必须支持回滚。


五、多租户隔离 — 怎么让不同客户的文档互相看不到

5.1 需求场景

A公司(金融客户):自己的文档只能 A 公司的人看到
B公司(医疗客户):同理,文档完全隔离
系统管理员:能看到所有租户数据

5.2 方案一:Collection 隔离(简单,数据量小)

# 每个租户一个 Collection
collection_a = client.create_collection(f"tenant_{tenant_id}_docs")
collection_b = client.create_collection(f"tenant_{tenant_id}_docs")

# 检索时只用租户自己的 Collection
def ask_for_tenant(tenant_id, question):
 collection = client.get_collection(f"tenant_{tenant_id}_docs")
 results = collection.query(
 query_embeddings=[embed(question)],
 n_results=5,
 where={"tenant_id": tenant_id} # 隔离
 )
 return results

优点:简单,隔离彻底
缺点:租户多了(1000+)时 Collection 管理复杂

5.3 方案二:Metadata 过滤(推荐,主流方案)

# 所有文档存在同一个 Collection,用 metadata 隔离
collection.add(
 ids=["doc_001", "doc_002"],
 documents=["A公司内部文档", "B公司内部文档"],
 metadatas=[
 {"tenant_id": "tenant_a", "access_level": "internal"},
 {"tenant_id": "tenant_b", "access_level": "internal"},
 ]
)

# 检索时强制加租户过滤
def ask_for_tenant(tenant_id, question):
 results = collection.query(
 query_embeddings=[embed(question)],
 n_results=5,
 where={"tenant_id": tenant_id} # 关键:强制过滤 ✅
 )
 return results

安全注意:应用层必须校验 tenant_id,防越权:

def ask_for_tenant(tenant_id: str, question: str, user_tenant_id: str):
 # 强制租户校验,防止横向越权
 if tenant_id != user_tenant_id:
 raise PermissionError("禁止跨租户访问")

 results = collection.query(
 query_embeddings=[embed(question)],
 n_results=5,
 where={"tenant_id": tenant_id}
 )
 return results

Java 类比:多租户 SQL 隔离,行级安全策略(RLS),每个查询自动加 WHERE tenant_id = ?

5.3 方案三:Embedding 时加入租户信号

# 高级方案:在向量中加入租户空间信息
# 同一租户内的文档在向量空间里更接近,不同租户文档间距更大

class TenantAwareEmbedder:
 def __init__(self, base_embedder):
 self.base = base_embedder

 def embed(self, text: str, tenant_id: str):
 base_vec = self.base.embed(text)
 tenant_signal = self._tenant_vector(tenant_id) # tenant 专属向量偏移
 return (base_vec + tenant_signal) / 2

六、全链路监控与可观测性

6.1 三个核心指标

RAG 系统三大黄金指标:

1. 检索质量
 - Recall@K:Top-K 结果里有多少真正相关(>0
...(truncated)...


思考题

第1题:LLM 延迟从 500ms 飙升到 8s,为什么是你的问题?

核心论点:你自己挖的坑,别甩给服务商

LLM 服务商 SLA 99.9% 指的是他们的服务可用性(服务器没宕机),不是你的体验延迟

你的 RAG 系统在高峰期延迟爆炸,根因在自己:

高峰期 1000 QPS × 每请求 2000 input tokens
= 1000 × 2000 = 2,000,000 tokens/秒

LLM 服务商的 token 处理能力有上限,
你的请求在队列里排队等 → 延迟从 500ms → 8000ms

这不是服务商挂了,而是你的流量超出了他们的处理容量
→ 你的系统设计有问题,不关服务商 SLA 什么事

根因分析

问题链条:

你的 RAG 系统 LLM 服务商
┌─────────────┐ ┌─────────────┐
│ 高峰 1000 QPS │ ──请求──▶ │ 处理上限 │
│ 无排队控制 │ ←─────── │ 5000 tok/s │
│ 无限流 │ 溢出 │ 实际 8000 │
└─────────────┘ └─────────────┘
 队列积压 → 8s 延迟

成本对比

措施

效果

改造成本

限流

保护 LLM,不过载

30 分钟

熔断

快速失败,不雪崩

1 小时

缓存

相同问题 0 LLM 调用

30 分钟

模型降级

高峰用便宜模型

1 小时

异步队列

削峰,平滑流量

4 小时

Prompt 瘦身

LLM 生成快 3 倍

30 分钟


第2题:竞争对手共用 RAG,多租户完整隔离方案

设计原则:防御纵深,即使单个环节被绕穿,还有其他层保护

隔离层级:
┌────────────────────────────────────────────┐
│ L1. 网络层:租户网络隔离(VPC/命名空间) │
│ L2. 应用层:租户 ID 强校验 + 行级安全 │
│ L3. Embedding 层:租户向量空间隔离 │
│ L4. 检索层:强制 Metadata 过滤 │
│ L5. 结果层:租户数据脱敏 + 审计日志 │
└────────────────────────────────────────────┘

完整方案(从请求到返回全链路)

L1:Embedding 层 — 租户向量空间隔离
class TenantAwareEmbedder:
 """
 租户专属向量空间:不同租户的文档在向量空间里天然隔离
 """

 def __init__(self, base_embedder: SentenceTransformer):
 self.base = base_embedder
 # 每个租户有自己的正交偏移向量,防止向量空间重叠
 self.tenant_offsets: dict[str, np.ndarray] = {}

 def _get_tenant_offset(self, tenant_id: str) -> np.ndarray:
 """为租户生成一个随机正交偏移,叠加到向量上"""
 if tenant_id not in self.tenant_offsets:
 # 生成一个与基向量正交的租户偏移向量
 np.random.seed(hash(tenant_id) % (2**31))
 self.tenant_offsets[tenant_id] = np.random.randn(self.base.get_sentence_embedding_dimension()) * 0.1
 return self.tenant_offsets[tenant_id]

 def embed(self, text: str, tenant_id: str) -> np.ndarray:
 # 基础语义向量
 base_vec = self.base.encode(text, normalize_embeddings=True)
 # 叠加租户专属偏移,使不同租户向量空间不重叠
 offset = self._get_tenant_offset(tenant_id)
 combined = base_vec + offset
 # 重新归一化
 return combined / np.linalg.norm(combined)

效果:即使攻击者拿到了其他租户的文档内容,他生成的向量和真正的租户向量不在同一个向量空间,检索不到。

L2:入库层 — 元数据强绑定
def ingest_document(tenant_id: str, doc_content: str, doc_id: str):
 """入库时强制绑定租户 ID"""

 # 租户 ID 是不可更改的元数据
 tenant_id = validate_tenant_id(tenant_id) # 从登录态取,不是请求参数

 vec = embedder.embed(doc_content, tenant_id) # 用租户专属向量

 # 元数据中写入租户 ID
 collection.add(
 ids=[doc_id],
 embeddings=[vec],
 documents=[doc_content],
 metadatas=[{
 "tenant_id": tenant_id, # 强制写入,不可伪造
 "tenant_id_hash": hash256(tenant_id), # 双重校验
 "created_at": timestamp,
 "access_level": "internal",
 }]
 )
L3:检索层 — 三重强制过滤
def search(tenant_id: str, question: str, user_tenant_id: str, user_id: str):
 """
 检索时三重校验,确保隔离
 """
 # 校验1:URL/请求中的 tenant_id 必须和登录态一致
 if tenant_id != user_tenant_id:
 audit_log.warning(f"越权检索尝试: user={user_id} tried to access tenant={tenant_id}")
 raise PermissionDenied("禁止跨租户访问")

 # 校验2:Embedding 必须用当前租户的空间
 query_vec = embedder.embed(question, tenant_id) # ❗不能用其他租户的向量

 # 校验3:Chromadb 查询时强制加 tenant_id 过滤(服务端验证)
 results = collection.query(
 query_embeddings=[query_vec],
 n_results=5,
 where={"tenant_id": tenant_id}, # 强制服务端过滤,客户端无法绕过
 where_document={"tenant_id": tenant_id} # 文档级别也要过滤
 )

 # 校验4:结果二次验证
 for doc_tenant in results["metadatas"][0]:
 if doc_tenant["tenant_id"] != tenant_id:
 audit_log.error(f"隔离失效检测: document leaked to wrong tenant")
 raise InternalSecurityError("隔离异常,立即告警")

 return results
L4:结果层 — 审计 + 脱敏
def audit_and_sanitize(results, tenant_id: str, user_id: str):
 """每次返回结果前,审计 + 脱敏"""

 # 审计日志(防事后抵赖)
 audit_log.info({
 "event": "rag_query",
 "tenant_id": tenant_id,
 "user_id": user_id,
 "result_count": len(results["documents"]),
 "timestamp": now(),
 "ip": request.ip,
 })

 # 结果脱敏:去掉敏感元数据
 safe_results = {
 "documents": results["documents"],
 "distances": results["distances"],
 # 不返回租户 ID、文档 ID 等元数据,防止枚举攻击
 }
 return safe_results

完整隔离架构图

用户 A(租户 A)请求:
 GET /ask?tenant_id=tenant_a
 Authorization: Bearer <tenant_a_token>

 → 网关校验 Token 中的 tenant_id = tenant_a ✅
 → Embedding 层:用 tenant_a 的向量空间 + 租户偏移 ✅
 → 入库/检索:服务端强制 where={"tenant_id": "tenant_a"} ✅
 → 结果:验证返回的文档 tenant_id = tenant_a ✅
 → 审计日志写入 ✅

用户 A 尝试攻击(拿到 tenant_b 的 ID):
 GET /ask?tenant_id=tenant_b ← 伪造请求参数

 → 网关:Token 中 tenant_id = tenant_a vs 请求 tenant_id = tenant_b
 → 校验失败 ❌ → 403 Forbidden → 不进入检索流程 ✅
 → 审计日志记录:"跨租户越权尝试" ✅

防御纵深总结

攻击场景

防御层级

阻断位置

伪造 tenant_id 参数

L3 校验1:Token vs 请求比对

网关层

拿到其他租户向量来检索

L1:租户向量空间正交隔离

Embedding 层

绕过 metadata 过滤

L3 校验3:服务端强制 where

检索层

暴力枚举 doc_id

L5:结果不返回 doc_id,元数据脱敏

结果层

事后抵赖查询记录

L5:每次查询写审计日志

数据层

Java 类比

  • 租户向量偏移 ≈ 数据库列级加密(即使 DB 管理员也看不到其他租户数据)
  • 三重校验 ≈ Spring Security 多层认证(过滤器 → AOP → Service)
  • 审计日志 ≈ 数据库操作审计表(防抵赖)

Q1 核心:LLM 服务商 SLA 99.9% 是可用性,不是你的延迟体验。
 你的延迟爆炸是流量超限,没有限流/熔断/缓存,和服务商无关。
 修复顺序:限流 → 熔断 → 缓存 → 降级 → 异步队列

Q2 核心:多租户隔离需要纵深防御,单一过滤不够。
 Embedding 层(租户向量空间)+ 入库层(强制元数据)+ 
 检索层(三重校验)+ 结果层(审计脱敏)
 核心原则:客户端不可信,所有校验在服务端做。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐