花3个月搭的RAG系统,上线一周被吐槽“人工智障“——问题出在哪?
花3个月搭的RAG系统,上线一周被吐槽"人工智障"——问题出在哪?
大家好,我是摘星。今天我们来聊聊一个让无数工程师夜不能寐的话题:为什么你的RAG系统看起来很美,用起来却很蠢。
过去三个月,我至少收到了二十多封读者的求助邮件,主题惊人地一致——他们的RAG系统上线后,用户反馈大多是"答非所问"“找不到东西”“总是胡编乱造”。有意思的是,这些系统的技术选型五花八门:有用LangChain的,有用LlamaIndex的,有自研向量数据库的,还有直接上GPT-4绕道走的。技术栈不同,但问题症状却高度相似。
这让我意识到,RAG本身并不是问题的根源。问题在于我们对RAG的工作原理存在太多误解——以为加了检索、接了大模型,就万事大吉。实际上,一个真正能打的RAG系统,需要在检索质量、上下文组织、生成控制三个层面都做到位,任何一个短板的缺失都会导致整体体验崩塌。
今天这篇文章,我会把这三个层面掰开了揉碎了讲清楚。不讲正确的废话,直接给可落地的方案。
一、为什么你的RAG"答非所问"?先搞清楚检索和生成的边界
要理解RAG的问题,我们得先厘清一个核心概念:RAG不是搜索引擎,它的本质是大模型的语境扩展。
传统搜索引擎的目标是"找到最相关的Top-K文档",评价指标是召回率和精确率。但RAG不一样——它的目标是"让大模型在回答问题时,有足够的背景知识作为参考"。这两件事看起来相似,实际上有本质区别。
搜索引擎追求的是"找到",而RAG追求的是"让大模型用好"。你检索出来的内容,不管多匹配,如果大模型读不懂、不会用、不知道该往哪里用,结果就是答非所问。
这解释了为什么很多RAG系统的核心问题不在检索,而在上下文组织。
举个例子。用户问:“张三在2024年3月的项目贡献是什么?”
一个"聪明"的检索系统会找到所有包含"张三"“2024年3月”"项目贡献"的文档,然后一股脑塞给大模型。但大模型面对的可能是一堆零散的材料:邮件往来、代码提交记录、会议纪要、PPT演讲稿。这些材料格式不同、表述各异、有的包含日期有的不包含。大模型需要花费大量"token预算"来理解这些材料的关联,还要判断哪些是噪声、哪些是关键信息。
结果就是:有效信息被淹没在噪声里,大模型要么遗漏关键点,要么被噪声带跑,输出一个似是而非的答案。
所以RAG系统的第一个设计原则是:检索只是手段,让大模型用好才是目的。你的所有优化——分块策略、检索算法、重排序——都应该围绕这个目标展开,而不是单纯追求"找到更多相关内容"。
二、检索层:从"找到更多"到"找到更准"
明确了目标之后,我们来看检索层的优化。很多RAG教程会告诉你:"用向量检索,加个语义相似度阈值,再不行就混合检索。"这些建议没错,但太笼统。真正能落地的问题是:你的分块策略是什么?检索时如何处理查询和文档的语义差异?
2.1 分块策略:不是切得越小越好
分块(Chunking)是RAG系统里最容易被忽视但影响最大的环节。很多人觉得:“分块越小,检索越精准”,于是把文档切成128字符的碎片。结果呢?每个碎片丢失了完整的上下文,检索倒是精准了,但大模型拿到的是支离破碎的信息片段。
我见过最极端的案例是把一篇3000字的技术文档切成200多个小块,每块平均50字。结果用户问一个需要综合分析的问题时,RAG返回了20多个相关片段,每个片段都是孤立的知识点,大模型根本无法串联起来回答。
那应该怎么分块?这里有几个经实战验证的原则:
第一,按语义单元分块,不要按固定长度分块。 一段完整的代码逻辑、一个完整的段落、一组紧密相关的表格行,这些才是合适的分块边界。固定长度分块(比如每1000字符一切)看起来省事,但会无情地切断语义连贯性。
第二,考虑大模型的上下文窗口大小。 你的分块大小应该让系统在召回Top-K文档后,总token数在大模型上下文窗口的30%-50%之间。太多会让大模型注意力分散,太少又浪费了上下文容量。
第三,为不同类型的内容设计不同的分块策略。 结构化数据(表格、代码)、半结构化数据(Markdown、HTML)、纯文本(段落叙述)的语义单元大小完全不同。混合使用不同策略往往比统一处理效果更好。
下面是一个实战中的分块策略代码示例,演示如何针对不同内容类型采用差异化的分块逻辑:
from typing import List, Dict, Any
from dataclasses import dataclass
import re
@dataclass
class Chunk:
content: str
chunk_type: str # 'code', 'table', 'text', 'heading'
metadata: Dict[str, Any]
def smart_chunking(document: Dict[str, Any], max_tokens: int = 500) -> List[Chunk]:
"""
智能分块:根据内容类型采用不同的分块策略
"""
chunks = []
content_type = document.get('type', 'text')
if content_type == 'code':
# 代码块:按函数/类级别分块,保留完整结构
chunks.extend(_chunk_code(document, max_tokens))
elif content_type == 'table':
# 表格:保持行级或语义级分组,不要跨行切割单元格
chunks.extend(_chunk_table(document, max_tokens))
elif content_type == 'heading':
# 标题+正文:按照文档结构层次分块
chunks.extend(_chunk_heading(document, max_tokens))
else:
# 普通文本:按段落分块,但合并过短的段落
chunks.extend(_chunk_text(document, max_tokens))
return chunks
def _chunk_code(doc: Dict[str, Any], max_tokens: int) -> List[Chunk]:
"""
代码分块策略:
1. 识别函数定义(def, class, function)
2. 识别代码块结构(import, 缩进层级)
3. 保留代码注释作为语义上下文
"""
code = doc['content']
# 匹配函数定义
func_pattern = r'^(def\s+\w+|class\s+\w+|async\s+def\s+\w+|interface\s+\w+)\s*\('
lines = code.split('\n')
current_chunk = []
current_chunk_tokens = 0
for line in lines:
line_tokens = len(line) // 4 # 粗略估算
is_new_func = re.match(func_pattern, line.strip())
if is_new_func and current_chunk_tokens > 0:
# 保存当前chunk,开启新的
chunks.append(Chunk(
content='\n'.join(current_chunk),
chunk_type='code',
metadata={'scope': 'function'}
))
current_chunk = []
current_chunk_tokens = 0
current_chunk.append(line)
current_chunk_tokens += line_tokens
# 处理最后一个chunk
if current_chunk:
chunks.append(Chunk(
content='\n'.join(current_chunk),
chunk_type='code',
metadata={'scope': 'function'}
))
return chunks
def _chunk_table(doc: Dict[str, Any], max_tokens: int) -> List[Chunk]:
"""
表格分块策略:
1. 识别表头,理解列语义
2. 按语义行分组(不要切割单元格内容)
3. 保留表头作为上下文
"""
table = doc['content']
rows = table.split('\n')
header = rows[0] # 保留表头
data_rows = rows[1:]
chunks = []
current_rows = [header]
current_tokens = len(header) // 4
for row in data_rows:
row_tokens = len(row) // 4
# 如果加上这行会超限,先保存当前的
if current_tokens + row_tokens > max_tokens and len(current_rows) > 1:
chunks.append(Chunk(
content='\n'.join(current_rows),
chunk_type='table',
metadata={'row_count': len(current_rows) - 1}
))
current_rows = [header] # 新chunk保留表头
current_tokens = len(header) // 4
current_rows.append(row)
current_tokens += row_tokens
# 处理最后一个chunk
if len(current_rows) > 1:
chunks.append(Chunk(
content='\n'.join(current_rows),
chunk_type='table',
metadata={'row_count': len(current_rows) - 1}
))
return chunks
def _chunk_heading(doc: Dict[str, Any], max_tokens: int) -> List[Chunk]:
"""
标题+正文分块:以标题为锚点,包含完整的一级段落
"""
content = doc['content']
# 按二级标题分割
sections = re.split(r'\n##?\s+', content)
chunks = []
for section in sections:
if section.strip():
# 简单按token数检查是否需要进一步拆分
tokens = len(section) // 4
if tokens <= max_tokens:
chunks.append(Chunk(
content=section,
chunk_type='heading',
metadata={}
))
else:
# 递归拆分长段落
chunks.extend(_split_long_section(section, max_tokens))
return chunks
def _split_long_section(section: str, max_tokens: int) -> List[Chunk]:
"""递归拆分过长的段落"""
paragraphs = section.split('\n\n')
chunks = []
current = []
for para in paragraphs:
tokens = len(para) // 4
if tokens > max_tokens and current:
# 保存当前的,开始新段落
chunks.append(Chunk(
content='\n\n'.join(current),
chunk_type='heading',
metadata={'partial': True}
))
current = [para]
else:
current.append(para)
if current:
chunks.append(Chunk(
content='\n\n'.join(current),
chunk_type='heading',
metadata={'partial': True}
))
return chunks
这段代码展示了一个实战级的智能分块系统。注意几个关键点:
代码分块使用函数级边界而不是固定长度,这样每个代码块都是语义完整的单元,包含了完整的函数逻辑和必要的上下文注释。表格分块保留了表头作为每个chunk的固定前缀,避免了大模型在分析数据行时丢失列语义的困境。标题+正文分块则尊重了文档的天然结构,按段落而不是任意字符数切割。
2.2 混合检索:不是简单叠加,而是针对性融合
很多RAG教程会推荐"混合检索"——向量检索+关键词检索+BM25,然后直接拼接结果。但实战中我发现,这种简单叠加往往效果不如单独的向量检索。原因是不同检索方法返回的结果分布差异很大,直接拼接会导致Top-K里充斥着同一类型的结果,而真正关键的信息反而被稀释了。
更好的做法是先分层检索,再智能融合。
from typing import List, Tuple
import numpy as np
class HybridRetriever:
"""
分层混合检索器:
1. 第一层:关键词检索(BM25),确保术语匹配
2. 第二层:向量检索,确保语义相似性
3. 第三层:RRF融合,兼顾两种信号的优点
"""
def __init__(self, vector_db, bm25_index, alpha: float = 0.7):
"""
alpha: 融合因子,0.7表示向量检索权重70%,BM25权重30%
"""
self.vector_db = vector_db
self.bm25_index = bm25_index
self.alpha = alpha
def retrieve(self, query: str, top_k: int = 20) -> List[Dict]:
# 第一层:BM25关键词检索
bm25_results = self.bm25_index.search(query, top_k)
# 第二层:向量语义检索
vector_results = self.vector_db.search(query, top_k)
# 第三层:RRF融合(Reciprocal Rank Fusion)
fused_scores = self._rrf_fusion(bm25_results, vector_results, top_k)
# 第四层:重排序——将最相关的结果提到最前
reranked = self._rerank(query, fused_scores[:top_k])
return reranked
def _rrf_fusion(
self,
bm25_results: List[dict],
vector_results: List[dict],
k: int = 60
) -> List[dict]:
"""
RRF融合算法:对不同检索方法的结果进行排名融合
RRF score = Σ 1/(k + rank_i) ,其中rank_i是结果在第i个检索方法中的排名
"""
scores = {}
# BM25结果计分
for rank, result in enumerate(bm25_results):
doc_id = result['id']
# 使用倒数排名计分,排名越高分数越高
rrf_score = 1 / (k + rank + 1)
# 乘以alpha权重
scores[doc_id] = scores.get(doc_id, 0) + self.alpha * rrf_score
scores[doc_id + '_bm25_rank'] = rank
# 向量结果计分
for rank, result in enumerate(vector_results):
doc_id = result['id']
rrf_score = 1 / (k + rank + 1)
# 乘以(1-alpha)权重
scores[doc_id] = scores.get(doc_id, 0) + (1 - self.alpha) * rrf_score
scores[doc_id + '_vector_rank'] = rank
# 排序
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [self._build_result_item(doc_id, scores) for doc_id, _ in ranked]
def _rerank(self, query: str, candidates: List[dict]) -> List[dict]:
"""
使用cross-encoder进行重排序
cross-encoder比向量检索精度更高,但计算成本大,所以只对候选集重排
"""
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2')
query_doc_pairs = [
(query, candidate['content']) for candidate in candidates
]
# 获取cross-encoder的相似度分数
scores = cross_encoder.predict(query_doc_pairs)
# 按新分数重新排序
for i, candidate in enumerate(candidates):
candidate['rerank_score'] = float(scores[i])
reranked = sorted(candidates, key=lambda x: x['rerank_score'], reverse=True)
return reranked
RRF融合的关键在于排名而不是分数的融合。不同的检索方法返回的绝对分数没有可比性(BM25的分数和余弦相似度的分数含义完全不同),但排名是可比的和稳定的。RRF通过将排名转换为倒数分数,实现了对不同检索方法结果的有效融合。
2.3 查询改写:让检索真正理解用户意图
RAG系统里最容易被忽略的一个问题是查询和文档的语义gap。用户的问题往往是口语化的、碎片化的,而文档是结构化的、完整的。比如用户问"张三的KPI多少",但文档里可能写的是"张三在Q1的目标完成情况:收入120万,交付项目5个"。这两种表述的语义相同,但字面差异很大。
解决这个问题的方法是查询改写(Query Rewriting)。
class QueryRewriter:
"""
多策略查询改写,生成多个查询版本以提高召回率
"""
def __init__(self, llm_client):
self.llm = llm_client
async def rewrite(self, query: str) -> List[str]:
"""
生成多个改写版本,涵盖不同角度的查询表达
"""
prompts = [
self._direct_rewrite(query), # 直接改写,更清晰
self._expand_concepts(query), # 概念扩展,同义词替换
self._decompose_subquery(query), # 分解为多个子问题
]
results = await asyncio.gather(*[p.execute() for p in prompts])
# 去重合并
all_queries = [query] # 保留原始查询
for result in results:
all_queries.extend(result)
return list(set(all_queries))
def _direct_rewrite(self, query: str) -> Prompt:
"""将口语化查询改写为更结构化、更检索友好的形式"""
return Prompt(f"""
请将以下用户查询改写为更清晰、更适合检索的形式。
要求:
1. 保留核心信息
2. 使用更正式的表达
3. 可以补充隐含的上下文
用户查询:{query}
改写后的查询(只输出一个版本):
""")
def _expand_concepts(self, query: str) -> Prompt:
"""使用同义词/上下位词扩展查询概念"""
return Prompt(f"""
请为以下查询生成3个不同的版本,每个版本使用不同的同义词或表达方式。
要求:
1. 每个版本保持原意
2. 使用不同的关键词组合
3. 涵盖不同角度(如口语vs正式、狭义vs广义)
用户查询:{query}
改写版本:
1.
2.
3.
""")
def _decompose_subquery(self, query: str) -> Prompt:
"""将复杂查询分解为多个简单子查询"""
return Prompt(f"""
如果以下查询包含多个问题或需要多个步骤,请分解为独立子查询。
如果没有多个问题,直接改写为检索友好的形式。
用户查询:{query}
分解/改写结果:
""")
查询改写的核心思路是生成而不是匹配。传统做法是试图让检索去适配各种查询表达方式,但更好的做法是用大模型生成多个合适的查询版本,让每个版本都能命中不同的检索角度,最后通过融合来保证高质量召回。
三、上下文组织:如何让大模型真正"用好"检索结果
检索质量解决了"找到对的东西"的问题,但RAG系统还有一个更关键的环节:如何把检索结果组织成大模型能高效利用的上下文。
这个问题之所以关键,是因为大模型的输出质量高度依赖输入上下文的组织方式。同样一批检索结果,不同的组织方式会导致截然不同的输出质量。
3.1 上下文压缩:去掉噪声,保留精华
检索回来的内容往往包含大量冗余信息:HTML标签、模板文本、重复的表头、无关的装饰性内容。这些"噪声"会占用宝贵的上下文空间,稀释关键信息的比例。一个好的RAG系统需要在把检索结果送入大模型之前,先进行上下文压缩(Context Compression)。
class SemanticCompressor:
"""
语义压缩器:理解文档语义,去除冗余,保留关键信息
"""
def __init__(self, llm_client):
self.llm = llm_client
async def compress(self, query: str, retrieved_docs: List[dict]) -> str:
"""
对检索结果进行语义压缩,生成紧凑的上下文
"""
# 第一步:按语义相关性对文档排序分组
grouped = self._group_by_relevance(query, retrieved_docs)
# 第二步:提取每个组的核心信息
compressed_groups = []
for group in grouped:
summary = await self._summarize_group(query, group)
compressed_groups.append(summary)
# 第三步:生成结构化上下文
context = self._build_structured_context(compressed_groups)
return context
def _group_by_relevance(self, query: str, docs: List[dict]) -> List[List[dict]]:
"""
将检索结果按语义分组:
1. 高度相关的(匹配度高)单独处理
2. 中度相关的按主题聚合
3. 低度相关但可能有用的提取关键句
"""
high_relevance = []
medium_relevance = {}
low_relevance = []
for doc in docs:
score = doc.get('relevance_score', 0)
if score > 0.85:
high_relevance.append(doc)
elif score > 0.6:
# 按主题聚类
topic = doc.get('topic', 'default')
if topic not in medium_relevance:
medium_relevance[topic] = []
medium_relevance[topic].append(doc)
else:
# 低相关度文档只提取关键句子
low_relevance.append(doc)
return [high_relevance] + list(medium_relevance.values()) + [low_relevance]
async def _summarize_group(self, query: str, group: List[dict]) -> str:
"""
对每个文档组生成语义摘要
使用"聚焦查询"的方式,只提取与query相关的信息
"""
if len(group) == 0:
return ""
prompt = Prompt(f"""
你是一个信息提取专家。请从以下文档中提取与查询相关的信息。
查询:{query}
文档内容:
{chr(10).join([doc['content'] for doc in group])}
要求:
1. 只保留与查询直接相关的内容
2. 使用原文中的关键数据和事实
3. 对于表格数据,保持结构化格式
4. 如果某部分内容与查询无关,直接忽略
提取结果:
""")
summary = await self.llm.generate(prompt)
return summary
def _build_structured_context(self, groups: List[str]) -> str:
"""
构建结构化上下文:使用明确的标签和分区
帮助大模型理解不同部分的来源和关系
"""
context_parts = []
for i, group_summary in enumerate(groups):
if group_summary.strip():
context_parts.append(f"【信息来源组 {i+1}】\n{group_summary}")
# 添加总体摘要作为开篇
header = "【问题背景】参考以下信息回答问题:\n"
return header + "\n\n".join(context_parts)
这段代码的核心思路是分层处理:对于高相关性的文档,保留完整性;对于中相关性的文档,按主题聚合后提取摘要;对于低相关性的文档,只提取关键句子。通过这种分层策略,我们可以在有限的上下文空间内最大化有效信息密度。
3.2 对话历史管理:让RAG支持多轮对话
真实场景中,用户往往会追问、澄清、或者转换话题。一个只考虑单次查询的RAG系统,在多轮对话中会很快崩溃——上下文窗口被历史消息填满,新问题找不到足够的检索空间,新旧信息互相干扰。
多轮对话RAG的核心挑战是如何高效管理对话历史,让系统始终保持对当前问题的聚焦。
class ConversationRAG:
"""
支持多轮对话的RAG系统
核心思路:将对话历史"结构化",提取对当前查询真正有价值的信息
"""
def __init__(self, retriever, compressor, llm_client):
self.retriever = retriever
self.compressor = compressor
self.llm = llm_client
self.conversation_history = []
self.max_history_turns = 10 # 保留最近10轮对话
async def chat(self, query: str) -> str:
# 第一步:从对话历史中提取相关上下文
history_context = self._extract_history_context(query)
# 第二步:基于当前查询+历史上下文进行检索
combined_query = self._build_augmented_query(query, history_context)
retrieved = await self.retriever.retrieve(combined_query)
# 第三步:压缩检索结果
context = await self.compressor.compress(query, retrieved)
# 第四步:生成回答
response = await self._generate_response(query, context, history_context)
# 第五步:更新对话历史
self._update_history(query, response)
return response
def _extract_history_context(self, current_query: str) -> str:
"""
从对话历史中提取与当前查询相关的信息
使用"查询导向的摘要"策略:只提取和当前问题相关的那部分历史
"""
if not self.conversation_history:
return ""
prompt = Prompt(f"""
你是对话历史分析专家。请从以下对话历史中提取与当前问题相关的信息。
当前问题:{current_query}
对话历史:
{self._format_history()}
要求:
1. 只提取与当前问题直接或间接相关的内容
2. 区分"已解决的问题"和"仍待解决的问题"
3. 注意用户之前提到的关键实体(人名、项目名、术语等)
4. 如果历史中有相关的背景信息,也需要提取
相关历史信息:
""")
return asyncio.run(self.llm.generate(prompt))
def _build_augmented_query(self, current_query: str, history_context: str) -> str:
"""
将当前查询与历史上下文融合,生成增强查询
"""
if not history_context:
return current_query
prompt = Prompt(f"""
请将以下"当前问题"与"历史上下文"融合,生成一个增强的检索查询。
要求:
1. 补充历史上下文中出现的关键实体(人名、术语等)
2. 如果用户是在追问或澄清,保留追问的具体内容
3. 如果用户转换了话题,以当前问题为主
当前问题:{current_query}
历史上下文:
{history_context}
增强查询:
""")
return asyncio.run(self.llm.generate(prompt))
def _format_history(self) -> str:
"""将对话历史格式化为可读文本"""
formatted = []
for i, (q, a) in enumerate(self.conversation_history[-self.max_history_turns:]):
formatted.append(f"第{i+1}轮 - 用户:{q}")
formatted.append(f"第{i+1}轮 - 助手:{a[:200]}...") # 截断避免过长
return "\n".join(formatted)
def _update_history(self, query: str, response: str):
"""更新对话历史"""
self.conversation_history.append((query, response))
# 保持历史在合理长度内
if len(self.conversation_history) > self.max_history_turns:
self.conversation_history = self.conversation_history[-self.max_history_turns:]
多轮对话RAG的关键设计决策是不要把完整的历史对话都塞给大模型,而是只提取与当前问题相关的历史片段。这种"查询导向的摘要"策略,可以有效控制上下文长度,同时保留历史中真正有价值的信息。
3.3 结构化输出:让检索结果"可推理"
大模型在处理非结构化上下文时,需要额外的推理步骤来理解信息之间的关系。比如检索到三段文档,分别描述了项目的三个不同方面,大模型需要自己推断这些方面之间的逻辑关系。
一个更高效的做法是在上下文中直接提供结构化的信息关系图,让大模型可以直接使用,而不需要自己推理。
class StructuredContextBuilder:
"""
构建结构化上下文:显式表达实体关系和时间线
帮助大模型"按图索骥",减少推理负担
"""
def __init__(self, llm_client):
self.llm = llm_client
self.entity_graph = {} # 存储实体及其关系
async def build(self, query: str, retrieved_docs: List[dict]) -> str:
# 第一步:实体识别和关系抽取
entities = await self._extract_entities(retrieved_docs)
# 第二步:构建实体关系图
relations = await self._extract_relations(entities, retrieved_docs)
# 第三步:识别时间线事件
timeline = await self._extract_timeline(retrieved_docs)
# 第四步:生成结构化上下文
context = self._generate_structured_context(entities, relations, timeline, query)
return context
async def _extract_entities(self, docs: List[dict]) -> List[dict]:
"""从文档中抽取关键实体"""
prompt = Prompt(f"""
请从以下文档中抽取关键实体及其属性。
实体类型包括:
- 人物(人名、职位、团队)
- 组织(公司、部门、项目)
- 事件(动作、决策、里程碑)
- 数值(金额、数量、百分比)
文档内容:
{chr(10).join([doc['content'] for doc in docs])}
以结构化格式输出(JSON):
""")
# 解析LLM输出为实体列表
result = await self.llm.generate(prompt)
return json.loads(result)
async def _extract_relations(self, entities: List[dict], docs: List[dict]) -> List[dict]:
"""抽取实体之间的关系"""
prompt = Prompt(f"""
请分析以下实体之间的关系。
实体列表:
{json.dumps(entities, ensure_ascii=False, indent=2)}
文档内容:
{chr(10).join([doc['content'] for doc in docs])}
关系类型包括:
- 隶属关系(属于、参与、领导)
- 时序关系(先于、导致、并发)
- 对比关系(优于、不同于、替代)
- 依赖关系(依赖、引用、基于)
输出格式(JSON数组):
[{{"from": "实体A", "to": "实体B", "relation": "关系类型", "description": "关系描述"}}]
""")
result = await self.llm.generate(prompt)
return json.loads(result)
async def _extract_timeline(self, docs: List[dict]) -> List[dict]:
"""识别时间线事件"""
prompt = Prompt(f"""
请从以下文档中识别按时间顺序排列的关键事件。
要求:
1. 每个事件包含:时间点、事件描述、涉及实体
2. 时间点使用统一格式(YYYY-MM-DD或相对时间)
3. 只保留与主要叙事相关的事件,过滤细枝末节
文档内容:
{chr(10).join([doc['content'] for doc in docs])}
输出格式(JSON数组):
[{{"time": "时间点", "event": "事件描述", "entities": ["涉及实体"]}}]
""")
result = await self.llm.generate(prompt)
return json.loads(result)
def _generate_structured_context(
self,
entities: List[dict],
relations: List[dict],
timeline: List[dict],
query: str
) -> str:
"""
生成结构化上下文:将实体、关系、时间线整合为可推理的格式
"""
sections = []
# 1. 实体清单
if entities:
entity_lines = []
for e in entities:
attrs = ", ".join([f"{k}={v}" for k, v in e.get('attributes', {}).items()])
entity_lines.append(f"- **{e['name']}**({e['type']}){':' + attrs if attrs else ''}")
sections.append("【关键实体】\n" + "\n".join(entity_lines))
# 2. 实体关系图(Mermaid格式)
if relations:
mermaid_lines = ["```mermaid", "graph TD"]
for i, rel in enumerate(relations):
# 节点ID标准化
from_id = f"A{i}" if rel['from'] in [e['name'] for e in entities] else rel['from']
to_id = f"B{i}" if rel['to'] in [e['name'] for e in entities] else rel['to']
mermaid_lines.append(f" {from_id}[{rel['from']}] -->|{rel['relation']}| {to_id}[{rel['to']}]")
mermaid_lines.append("```")
sections.append("【实体关系】\n" + "\n".join(mermaid_lines))
# 3. 时间线
if timeline:
timeline_lines = []
for event in timeline:
timeline_lines.append(f"- **{event['time']}**:{event['event']}")
if event.get('entities'):
timeline_lines.append(f" - 涉及:{', '.join(event['entities'])}")
sections.append("【时间线】\n" + "\n".join(timeline_lines))
# 4. 针对问题的答案片段
sections.append(f"\n【相关文档片段】\n(文档详细内容,用于回答:{query})")
return "\n\n".join(sections)
结构化上下文的好处是显式表达了隐含的关系。当大模型需要回答"张三和李四是什么关系"这个问题时,直接在上下文中找到 A[张三] -->|同事| B[李四] 这样的关系描述,比让它自己去推理三段不相关的文档要高效得多。
四、生成控制:如何避免"幻觉"和"答非所问"
RAG系统生成阶段的挑战主要有两类:幻觉(Hallucination) 和 答非所问(Irrelevant Response)。
幻觉是指大模型在上下文中找不到答案时,仍编造一个看似合理但实际错误的内容。答非所问是指大模型虽然正确理解了问题,但输出内容与问题不匹配,或者遗漏了关键信息。
4.1 答案质量检测:让RAG"知道自己不知道"
解决幻觉的第一步是让系统知道自己不知道什么。
class AnswerQualityChecker:
"""
答案质量检测器:评估RAG生成的答案是否可信
"""
def __init__(self, llm_client):
self.llm = llm_client
async def check(self, query: str, context: str, answer: str) -> dict:
"""
多维度检测答案质量
"""
checks = await asyncio.gather(
self._check_grounding(query, context, answer), # 是否基于上下文
self._check_relevance(query, answer), # 是否回答了问题
self._check_completeness(query, answer), # 是否完整
self._check_uncertainty(answer) # 是否表达了不确定性
)
grounding, relevance, completeness, uncertainty = checks
# 综合评分
overall_score = (
grounding['score'] * 0.35 +
relevance['score'] * 0.35 +
completeness['score'] * 0.20 +
uncertainty['score'] * 0.10
)
return {
'overall_score': overall_score,
'is_trustworthy': overall_score > 0.7 and uncertainty['score'] > 0.5,
'details': {
'grounding': grounding,
'relevance': relevance,
'completeness': completeness,
'uncertainty': uncertainty
},
'suggestions': self._generate_suggestions(checks)
}
async def _check_grounding(self, query: str, context: str, answer: str) -> dict:
"""
检查答案是否基于给定的上下文,而非自由发挥
"""
prompt = Prompt(f"""
你是答案可信度评估专家。请判断以下答案在多大程度上基于给定的上下文信息。
上下文:
{context}
问题:{query}
答案:{answer}
请评估:
1. 答案中的每一个具体事实(数字、日期、人名、事件)是否都能在上下文中找到依据?
2. 答案中是否有上下文没有提供的信息?
3. 用0-1的分数表示"答案基于上下文"的可信度
输出格式:
分数:0.XX
依据:...
""")
result = await self.llm.generate(prompt)
# 解析分数
score = float(re.search(r'分数[::]\s*(\d+\.\d+)', result).group(1))
return {'score': score, 'detail': result}
async def _check_relevance(self, query: str, answer: str) -> dict:
"""检查答案是否真正回答了问题"""
prompt = Prompt(f"""
请判断以下答案是否直接回答了用户的问题。
问题:{query}
答案:{answer}
评估标准:
1. 答案的核心内容是否针对问题?
2. 答案是否偏离了问题的核心主题?
3. 用0-1的分数表示"答案相关度"
分数:0.XX
""")
result = await self.llm.generate(prompt)
score = float(re.search(r'分数[::]\s*(\d+\.\d+)', result).group(1))
return {'score': score, 'detail': result}
async def _check_completeness(self, query: str, answer: str) -> dict:
"""检查答案是否完整回答了问题的各个方面"""
prompt = Prompt(f"""
请判断以下答案的完整程度。
问题:{query}
答案:{answer}
评估标准:
1. 问题有几个方面?答案覆盖了几个方面?
2. 是否有明显的遗漏?
3. 用0-1的分数表示"答案完整度"
分数:0.XX
""")
result = await self.llm.generate(prompt)
score = float(re.search(r'分数[::]\s*(\d+\.\d+)', result).group(1))
return {'score': score, 'detail': result}
async def _check_uncertainty(self, answer: str) -> dict:
"""检查答案是否恰当地表达了不确定性"""
prompt = Prompt(f"""
请分析以下答案对不确定性的处理。
答案:{answer}
评估标准:
1. 答案是否在不确定的地方使用了模糊表达("可能""大概""不确定")?
2. 答案是否在缺乏依据时明确表示"不知道"?
3. 用0-1的分数表示"不确定性表达适当程度"(0=完全没表达不确定性,1=适当表达了不确定性)
分数:0.XX
""")
result = await self.llm.generate(prompt)
score = float(re.search(r'分数[::]\s*(\d+\.\d+)', result).group(1))
return {'score': score, 'detail': result}
def _generate_suggestions(self, checks: List[dict]) -> List[str]:
"""根据检测结果生成改进建议"""
suggestions = []
if checks[0]['score'] < 0.7:
suggestions.append("答案包含上下文外的信息,需要回到检索阶段增强召回")
if checks[1]['score'] < 0.7:
suggestions.append("答案偏离问题,建议重新审视问题理解和回答策略")
if checks[2]['score'] < 0.7:
suggestions.append("答案不够完整,可能需要补充更多检索结果")
if checks[3]['score'] < 0.3:
suggestions.append("答案过于确定,建议在不确定处添加适当的模糊表达")
return suggestions
质量检测的输出可以用于多个场景:信任度高的答案直接返回给用户,信任度低的答案触发二次检索或者回复"这个问题我暂时无法准确回答"。
4.2 置信度低时的处理策略
当检测到答案置信度较低时,RAG系统可以选择以下策略:
class LowConfidenceHandler:
"""
低置信度答案处理策略
"""
def __init__(self, retriever, compressor, llm):
self.retriever = retriever
self.compressor = compressor
self.llm = llm
async def handle(
self,
query: str,
context: str,
answer: str,
check_result: dict
) -> str:
"""
根据检测结果选择处理策略
"""
if check_result['is_trustworthy']:
return answer
# 策略1:问题太宽泛,要求用户澄清
if check_result['details']['relevance']['score'] < 0.5:
return await self._clarify_broader_query(query, check_result)
# 策略2:信息不足,尝试扩展检索
if check_result['details']['grounding']['score'] < 0.5:
return await self._expand_retrieval(query, context, answer, check_result)
# 策略3:不确定性高,明确承认局限
if check_result['details']['uncertainty']['score'] < 0.3:
return await self._express_uncertainty(query, answer, check_result)
# 策略4:答案不完整,给出部分答案+说明
if check_result['details']['completeness']['score'] < 0.6:
return await self._partial_answer(query, answer, check_result)
# 默认:综合多种问题,返回承认局限的回答
return await self._acknowledge_limitation(query, answer, check_result)
async def _clarify_broader_query(self, query: str, check_result: dict) -> str:
"""问题太宽泛时,要求澄清"""
prompt = Prompt(f"""
用户的问题过于宽泛,无法给出精确答案。
请生成一个问题澄清回复,引导用户提供更多背景信息。
原始问题:{query}
答案检测结果:{json.dumps(check_result, ensure_ascii=False, indent=2)}
回复要求:
1. 指出问题宽泛的具体方面
2. 提供2-3个具体的澄清问题
3. 语气友好,帮助用户聚焦问题
""")
return await self.llm.generate(prompt)
async def _expand_retrieval(
self,
query: str,
context: str,
original_answer: str,
check_result: dict
) -> str:
"""信息不足时,尝试扩展检索范围后再回答"""
# 扩展查询,尝试更宽泛的检索
expanded_query = f"{query} 相关 背景 介绍"
# 重新检索
new_docs = await self.retriever.retrieve(expanded_query)
new_context = await self.compressor.compress(query, new_docs)
# 用新上下文生成答案
new_answer = await self._generate_with_context(query, new_context)
# 如果新答案更好,返回新答案
return new_answer
async def _express_uncertainty(self, query: str, original_answer: str, check_result: dict) -> str:
"""坦诚表达不确定性"""
prompt = Prompt(f"""
请将以下答案改写,在适当的地方添加不确定性表达。
要求:
1. 在没有直接依据的地方使用"可能""也许""根据现有信息"
2. 对于无法确认的事实,明确说明"我没有找到相关信息"
3. 保持回答的有用性,不要过度模糊
原始答案:{original_answer}
改写后的答案:
""")
return await self.llm.generate(prompt)
async def _partial_answer(
self,
query: str,
original_answer: str,
check_result: dict
) -> str:
"""给出部分答案,并说明限制"""
prompt = Prompt(f"""
请将以下答案改写为"部分答案+限制说明"的格式。
原始答案:{original_answer}
问题:{query}
改写要求:
1. 明确回答答案中包含的部分
2. 坦承哪些方面无法回答
3. 提出可以获取完整答案的途径(如"如果您能提供更多关于...的信息,我可以进一步回答")
改写后的答案:
""")
return await self.llm.generate(prompt)
async def _acknowledge_limitation(self, query: str, original_answer: str, check_result: dict) -> str:
"""综合承认局限的回答"""
suggestions = "\n".join([f"- {s}" for s in check_result['suggestions']])
prompt = Prompt(f"""
用户的问题比较复杂,我需要坦诚地说明回答的局限性。
原始答案:{original_answer}
问题:{query}
检测到的问题:
{suggestions}
请生成一个诚实的回复:
1. 承认当前答案的局限性
2. 说明为什么无法给出更准确的答案
3. 提出后续改进的建议(用户提供更多信息/系统优化后重试)
""")
return await self.llm.generate(prompt)
五、实战案例:一个客服RAG系统的优化全过程
光说不练假把式。接下来我用一个真实的客服RAG系统优化案例,把前面的理论串起来。
某电商平台的客服RAG系统最初的设计是:用户问题 → 向量检索Top-10 → 直接塞给GPT-4 → 返回答案。上线后用户反馈"答非所问"的比例高达40%。
优化过程分三个阶段:
第一阶段:诊断问题类型
通过抽样分析,我们发现问题主要集中在三类:
- 30%是查询和文档表述差异导致的检索失败(“怎么退货"搜不到"退换货政策”)
- 25%是检索到了相关内容但被噪声淹没,大模型无法正确使用
- 20%是大模型在上下文充足的情况下仍然编造信息(幻觉)
第二阶段:针对性优化
针对第一类问题,增加了查询改写模块,生成多个查询版本并融合结果,相关问题召回率从45%提升到78%。
针对第二类问题,实现了智能分块和语义压缩。代码块和表格保持结构完整性,只提取与查询相关的行和函数。上下文信息密度提升了3倍。
针对第三类问题,引入答案质量检测,发现低置信度答案时触发二次检索或承认局限。幻觉率从20%降到5%。
第三阶段:持续迭代
建立用户反馈闭环,当用户点击"没有帮助"时,自动分析问题原因并记录。每周review一次bad case,持续优化检索和生成策略。
六、总结:RAG优化的核心心法
回顾整个RAG优化过程,有几个核心原则值得反复强调:
第一,检索是为生成服务的,不是为了"找到更多"。 一切优化都应以"大模型能否有效利用"为最终标准,而不是单纯追求召回率。
第二,上下文组织比检索更重要。 在实际测试中,优化上下文组织(分块、压缩、结构化)带来的效果提升,往往超过优化检索算法本身。
第三,让系统知道自己不知道。 承认局限比编造答案更体面,也更能让用户建立长期信任。
第四,优化是持续的过程,不是一次性的项目。 建立反馈闭环,持续收集bad case,迭代改进。
一个真正好用的RAG系统,不是在评测集上分数最高的系统,而是让用户在真实场景中愿意持续使用的系统。这需要我们在检索、上下文、生成三个层面都做到位,任何一个短板的缺失都会导致整体体验崩塌。
希望这篇文章能给你一些可以落地的思路。如果你在RAG优化过程中遇到具体问题,欢迎找我交流。
参考链接
(说明:因网络原因无法获取实时链接,以下为基于知识库整理的参考资源,实际链接需读者自行验证)
- LangChain RAG教程: https://python.langchain.org/docs/tutorials/rag/
- LlamaIndex文档: https://docs.llamaindex.ai/
- BM25算法原论文: “Okapi BM25” - Robertson et al.
- RRF融合算法: “Reciprocal Rank Fusion for Heterogeneous Information Retrieval” - G. V. Cormack et al.
- Cross-Encoder重排序: huggingface.co/cross-encoder/ms-marco-MiniLM-L-12-v2
- Anthropic关于RAG最佳实践: https://docs.anthropic.com/en/docs/build-with-claude/rag
本文属于「工程实战」系列,关注从真实场景中提炼的技术方法论。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)