RAG技术介绍,工作流程,具体实现
RAG技术详解
目录
一、RAG技术概述
RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合了信息检索和语言模型生成的混合AI技术。它通过在生成回答之前,从外部知识库中检索相关信息来增强语言模型的回答质量。
核心概念
RAG的核心思想:
- 检索(Retrieval):从大规模文档库中找到与问题相关的信息片段
- 增强(Augmented):将检索到的相关信息作为上下文提供给语言模型
- 生成(Generation):语言模型基于检索到的信息生成更准确、更丰富的回答
为什么需要RAG:
- 解决大语言模型的"幻觉"问题
- 提供最新、准确的信息
- 减少模型训练成本
- 实现知识库的动态更新
二、RAG工作流程
1. 整体架构
RAG系统通常包含以下核心组件:
用户查询 → 查询编码 → 文档检索 → 结果重排序 → 上下文构建 → 回答生成 → 输出生成
2. 详细工作流程
阶段一:文档处理(离线)
-
文档分割(Chunking)
- 将大文档切分成小块(通常512-1024 tokens)
- 考虑语义连贯性,避免在关键信息处断开
- 详见第五章:文档分割的实现
-
向量化(Embedding)
- 使用嵌入模型将文本转换为向量
- 常用模型:text-embedding-ada-002、bge-large等
-
向量存储
- 将向量存储在向量数据库中
- 支持高效的相似度搜索
阶段二:查询处理(在线)
-
查询编码
- 将用户问题转换为向量表示
- 与文档块使用相同的嵌入模型
-
相似度检索
- 在向量数据库中查找最相关的文档块
- 通常返回Top-K个结果(K=3-10)
-
重排序(可选)
- 使用交叉编码器对检索结果重新排序
- 提高检索准确性
-
上下文构建
- 将检索到的文档块与原始查询组合
- 控制总长度,避免超出模型限制
-
回答生成
- 使用语言模型基于增强的上下文生成回答
- 可以采用不同的提示模板
3. 工作流程图
三、具体实现方案
1. 技术栈选择
核心组件
- 嵌入模型:OpenAI text-embedding-ada-002、BGE系列、Sentence-BERT
- 向量数据库:Pinecone、Weaviate、Chroma、FAISS、Milvus
- 语言模型:GPT-3.5/4、Claude、Llama 2/3、ChatGLM
- 框架:LangChain、LlamaIndex
2. 代码实现示例
基础RAG实现(Python)
import openai
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
# 1. 文档处理和向量存储
def setup_rag_system(documents):
# 文档分块
text_splitter = CharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.split_documents(documents)
# 向量化
embeddings = OpenAIEmbeddings()
# 向量存储
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings
)
return vectorstore
# 2. RAG查询处理
def rag_query(vectorstore, query):
# 创建检索器
retriever = vectorstore.as_retriever(
search_kwargs={"k": 3}
)
# 创建RAG链
qa_chain = RetrievalQA.from_chain_type(
llm=OpenAI(temperature=0),
chain_type="stuff",
retriever=retriever
)
# 执行查询
result = qa_chain({"query": query})
return result["result"]
高级实现(带重排序)
from sentence_transformers import CrossEncoder
class AdvancedRAG:
def __init__(self, vectorstore, cross_encoder_model):
self.vectorstore = vectorstore
self.cross_encoder = CrossEncoder(cross_encoder_model)
def rerank_results(self, query, retrieved_docs):
# 构建查询-文档对
query_doc_pairs = [(query, doc.page_content) for doc in retrieved_docs]
# 重排序
scores = self.cross_encoder.predict(query_doc_pairs)
# 按分数排序
reranked_docs = [doc for _, doc in sorted(
zip(scores, retrieved_docs),
key=lambda x: x[0],
reverse=True
)]
return reranked_docs
def query(self, question):
# 初步检索
docs = self.vectorstore.similarity_search(question, k=10)
# 重排序
reranked_docs = self.rerank_results(question, docs)
# 构建上下文
context = "\n\n".join([doc.page_content for doc in reranked_docs[:3]])
# 生成回答
prompt = f"""基于以下信息回答问题:
背景信息:{context}
问题:{question}
请给出准确、详细的回答:"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
3. 部署架构
简单部署
前端 → API服务 → RAG引擎 → 向量数据库
↓
语言模型API
生产级部署
负载均衡器 → 多个API实例 → RAG服务集群
↓
Redis缓存层
↓
向量数据库集群
↓
语言模型服务
4. 性能优化
缓存策略
import redis
import hashlib
class CachedRAG:
def __init__(self, rag_system, redis_client):
self.rag_system = rag_system
self.redis = redis_client
def get_cache_key(self, query):
return hashlib.md5(query.encode()).hexdigest()
def query(self, question):
cache_key = self.get_cache_key(question)
# 检查缓存
cached_result = self.redis.get(cache_key)
if cached_result:
return cached_result.decode()
# 执行RAG查询
result = self.rag_system.query(question)
# 缓存结果
self.redis.setex(cache_key, 3600, result) # 缓存1小时
return result
批处理和异步
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def batch_rag_queries(queries, rag_system):
async def process_query(query):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor,
rag_system.query,
query
)
return result
tasks = [process_query(query) for query in queries]
results = await asyncio.gather(*tasks)
return results
四、应用场景和优势
1. 主要应用场景
企业知识库问答
- 场景:员工查询公司政策、流程文档
- 优势:准确、一致,避免信息过时
- 案例:HR政策问答、技术文档查询
客服系统
- 场景:产品支持、故障排查
- 优势:基于最新产品文档提供准确答案
- 案例:技术产品客服、金融服务咨询
医疗健康
- 场景:医学文献查询、诊断辅助
- 优势:基于权威医学资料提供信息
- 案例:临床决策支持、医学研究
法律合规
- 场景:法律条文查询、合规检查
- 优势:确保信息准确性和时效性
- 案例:法律研究、合规审查
教育培训
- 场景:课程答疑、学习材料查询
- 优势:基于教材内容提供准确解释
- 案例:在线学习平台、企业培训
2. RAG的优势
准确性提升
- 减少幻觉:基于真实文档生成回答
- 事实核查:可追溯到原始信息来源
- 时效性强:知识库可随时更新
成本效益
- 降低训练成本:无需重新训练大模型
- 灵活更新:知识库独立于模型更新
- 资源优化:可按需检索,减少计算开销
可控性和透明度
- 内容控制:可限制检索源和生成内容
- 审计追踪:可追踪回答的信息来源
- 质量保证:可实施质量过滤和验证
可扩展性
- 多源整合:可整合多个知识源
- 领域适配:易于适配不同专业领域
- 规模弹性:支持从小规模到大规模部署
3. 挑战和解决方案
检索质量
- 挑战:检索到不相关或低质量文档
- 解决方案:
- 优化文档分块策略
- 使用重排序模型
- 多阶段检索(粗筛+精筛)
上下文长度限制
- 挑战:大模型上下文窗口限制
- 解决方案:
- 智能文档摘要
- 分层检索策略
- 动态上下文选择
性能和延迟
- 挑战:检索和生成的延迟
- 解决方案:
- 结果缓存
- 异步处理
- 向量数据库优化
知识更新
- 挑战:保持知识库的时效性
- 解决方案:
- 自动化文档监控
- 增量更新机制
- 版本控制和回滚
4. 未来发展趋势
- 多模态RAG:支持图像、音频等多媒体内容
- 自适应RAG:根据查询类型自动调整检索策略
- 联邦RAG:跨多个数据源的隐私保护检索
- 实时RAG:支持流式数据实时检索和更新
五、文档分割的实现
文档分割(Document Chunking)是RAG系统中的关键环节,直接影响检索质量和最终回答的准确性。
5.1 文档分割的重要性
为什么需要文档分割?
- 模型限制: 大语言模型的上下文窗口有限(通常4K-32K tokens)
- 检索精度: 小块文档更容易实现精准匹配
- 计算效率: 减少不必要的计算开销
- 语义完整: 保持文本片段的语义连贯性
分割不当的后果
- 关键信息被截断,影响语义理解
- 检索到不相关的内容,降低回答质量
- 重要上下文丢失,导致回答不完整
5.2 常见分割策略
5.2.1 固定长度分割
原理: 按照固定的token或字符数进行分割
优点: 简单高效,易于实现
缺点: 可能切断语义连贯的句子
def fixed_size_chunking(text, chunk_size=512, overlap=50):
"""
固定长度文档分割
:param text: 输入文本
:param chunk_size: 每个块的大小
:param overlap: 重叠部分大小
:return: 分割后的文本块列表
"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
# 移动起始位置,考虑重叠
start = end - overlap
# 避免无限循环
if start >= len(text):
break
return chunks
5.2.2 基于句子的分割
原理: 以句子为基本单位进行分割
优点: 保持句子完整性,语义更连贯
缺点: 单句可能过长,需要二次处理
import nltk
from nltk.tokenize import sent_tokenize
def sentence_based_chunking(text, max_tokens=512, overlap_sentences=1):
"""
基于句子的文档分割
:param text: 输入文本
:param max_tokens: 最大token数
:param overlap_sentences: 重叠句子数
:return: 分割后的文本块列表
"""
sentences = sent_tokenize(text)
chunks = []
current_chunk = []
current_tokens = 0
for i, sentence in enumerate(sentences):
sentence_tokens = len(sentence.split())
# 如果当前句子单独就超过限制,需要特殊处理
if sentence_tokens > max_tokens:
# 如果当前块不为空,先保存
if current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_tokens = 0
# 对长句子进行进一步分割
sub_chunks = fixed_size_chunking(sentence, max_tokens, max_tokens // 4)
chunks.extend(sub_chunks)
continue
# 如果添加当前句子会超出限制
if current_tokens + sentence_tokens > max_tokens and current_chunk:
chunks.append(" ".join(current_chunk))
# 保留重叠的句子
overlap_start = max(0, len(current_chunk) - overlap_sentences)
current_chunk = current_chunk[overlap_start:]
current_tokens = sum(len(s.split()) for s in current_chunk)
current_chunk.append(sentence)
current_tokens += sentence_tokens
# 添加最后一个块
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
5.2.3 基于段落的分割
原理: 以自然段落为单位进行分割
优点: 保持段落语义完整性
缺点: 段落长度差异大
def paragraph_based_chunking(text, max_tokens=512, overlap_paragraphs=1):
"""
基于段落的文档分割
:param text: 输入文本
:param max_tokens: 最大token数
:param overlap_paragraphs: 重叠段落数
:return: 分割后的文本块列表
"""
# 按段落分割(空行分隔)
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
chunks = []
current_chunk = []
current_tokens = 0
for i, paragraph in enumerate(paragraphs):
paragraph_tokens = len(paragraph.split())
# 如果当前段落单独就超过限制
if paragraph_tokens > max_tokens:
# 如果当前块不为空,先保存
if current_chunk:
chunks.append("\n\n".join(current_chunk))
current_chunk = []
current_tokens = 0
# 对长段落进行句子级分割
sub_chunks = sentence_based_chunking(paragraph, max_tokens, 1)
chunks.extend(sub_chunks)
continue
# 如果添加当前段落会超出限制
if current_tokens + paragraph_tokens > max_tokens and current_chunk:
chunks.append("\n\n".join(current_chunk))
# 保留重叠的段落
overlap_start = max(0, len(current_chunk) - overlap_paragraphs)
current_chunk = current_chunk[overlap_start:]
current_tokens = sum(len(p.split()) for p in current_chunk)
current_chunk.append(paragraph)
current_tokens += paragraph_tokens
# 添加最后一个块
if current_chunk:
chunks.append("\n\n".join(current_chunk))
return chunks
5.2.4 语义感知分割
原理: 基于文本语义结构进行智能分割
优点: 保持语义完整性,提高检索质量
缺点: 实现复杂,计算开销大
from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class SemanticChunker:
def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
def get_embeddings(self, texts):
"""获取文本的嵌入向量"""
inputs = self.tokenizer(texts, padding=True, truncation=True,
return_tensors='pt', max_length=512)
with torch.no_grad():
outputs = self.model(**inputs)
embeddings = outputs.last_hidden_state[:, 0, :].numpy()
return embeddings
def semantic_chunking(self, text, max_tokens=512, similarity_threshold=0.7):
"""
语义感知的文档分割
:param text: 输入文本
:param max_tokens: 最大token数
:param similarity_threshold: 语义相似度阈值
:return: 分割后的文本块列表
"""
sentences = sent_tokenize(text)
chunks = []
current_chunk = []
current_tokens = 0
for i, sentence in enumerate(sentences):
sentence_tokens = len(sentence.split())
# 如果当前句子单独就超过限制
if sentence_tokens > max_tokens:
if current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_tokens = 0
# 对长句子进行固定长度分割
sub_chunks = fixed_size_chunking(sentence, max_tokens, max_tokens // 4)
chunks.extend(sub_chunks)
continue
# 检查语义相似度
should_break = False
if len(current_chunk) >= 2: # 至少有2个句子才检查
current_text = " ".join(current_chunk[-2:]) # 取最后两个句子
combined_text = current_text + " " + sentence
embeddings = self.get_embeddings([current_text, sentence])
similarity = cosine_similarity(
embeddings[0].reshape(1, -1),
embeddings[1].reshape(1, -1)
)[0][0]
# 如果相似度太低,建议在此处分隔
if similarity < similarity_threshold:
should_break = True
# 如果添加当前句子会超出限制或语义不连贯
if (current_tokens + sentence_tokens > max_tokens or should_break) and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_tokens = 0
current_chunk.append(sentence)
current_tokens += sentence_tokens
# 添加最后一个块
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
5.2.5 基于标题层次的分割
原理: 利用文档的标题结构进行分割
优点: 符合文档自然结构,便于导航
缺点: 需要文档有清晰的标题结构
import re
def heading_based_chunking(text, max_tokens=512):
"""
基于标题层次的文档分割
:param text: 输入文本
:param max_tokens: 最大token数
:return: 分割后的文本块列表
"""
# 匹配常见的标题格式
heading_patterns = [
r'^#+\s+.+$', # Markdown标题
r'^\d+\.\s+.+$', # 数字编号
r'^[A-Z][^.!?]*[:.]', # 大写字母开头
]
lines = text.split('\n')
chunks = []
current_chunk = []
current_tokens = 0
for line in lines:
line = line.strip()
if not line:
continue
# 检查是否是标题
is_heading = any(re.match(pattern, line, re.MULTILINE) for pattern in heading_patterns)
line_tokens = len(line.split())
# 如果是标题且当前块不为空,保存当前块
if is_heading and current_chunk and current_tokens > 0:
chunks.append("\n".join(current_chunk))
current_chunk = []
current_tokens = 0
# 如果添加当前行会超出限制
if current_tokens + line_tokens > max_tokens and current_chunk:
chunks.append("\n".join(current_chunk))
current_chunk = []
current_tokens = 0
current_chunk.append(line)
current_tokens += line_tokens
# 添加最后一个块
if current_chunk:
chunks.append("\n".join(current_chunk))
return chunks
5.3 高级分割策略
5.3.1 递归分割
class RecursiveChunker:
def __init__(self, chunk_sizes=[1000, 500, 250]):
self.chunk_sizes = chunk_sizes
def recursive_chunk(self, text, level=0):
"""
递归文档分割
:param text: 输入文本
:param level: 当前递归层级
:return: 分割后的文本块列表
"""
if level >= len(self.chunk_sizes):
return [text]
chunk_size = self.chunk_sizes[level]
# 尝试当前层级的分割策略
if level == 0:
chunks = paragraph_based_chunking(text, chunk_size)
elif level == 1:
chunks = sentence_based_chunking(text, chunk_size)
else:
chunks = fixed_size_chunking(text, chunk_size)
# 如果块的数量没有减少,进入下一层级
if len(chunks) <= 1 and level < len(self.chunk_sizes) - 1:
return self.recursive_chunk(text, level + 1)
# 递归处理每个块
final_chunks = []
for chunk in chunks:
if len(chunk.split()) > self.chunk_sizes[-1]:
sub_chunks = self.recursive_chunk(chunk, level + 1)
final_chunks.extend(sub_chunks)
else:
final_chunks.append(chunk)
return final_chunks
5.3.2 智能分割
class IntelligentChunker:
def __init__(self, max_tokens=512):
self.max_tokens = max_tokens
def analyze_document_structure(self, text):
"""分析文档结构,选择最佳分割策略"""
lines = text.split('\n')
# 检查是否有标题结构
has_headings = any(line.strip().startswith('#') for line in lines)
# 检查段落长度分布
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
avg_paragraph_length = sum(len(p.split()) for p in paragraphs) / len(paragraphs) if paragraphs else 0
# 检查句子长度分布
sentences = sent_tokenize(text)
avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0
return {
'has_headings': has_headings,
'avg_paragraph_length': avg_paragraph_length,
'avg_sentence_length': avg_sentence_length,
'paragraph_count': len(paragraphs),
'sentence_count': len(sentences)
}
def intelligent_chunk(self, text):
"""智能选择分割策略"""
structure = self.analyze_document_structure(text)
# 如果有清晰的标题结构,使用标题分割
if structure['has_headings']:
return heading_based_chunking(text, self.max_tokens)
# 如果段落平均长度适中,使用段落分割
elif structure['avg_paragraph_length'] <= self.max_tokens * 0.8:
return paragraph_based_chunking(text, self.max_tokens)
# 如果句子较短,使用句子分割
elif structure['avg_sentence_length'] <= self.max_tokens * 0.6:
return sentence_based_chunking(text, self.max_tokens)
# 否则使用递归分割
else:
recursive_chunker = RecursiveChunker([self.max_tokens, self.max_tokens//2, self.max_tokens//4])
return recursive_chunker.recursive_chunk(text)
5.4 实际应用建议
5.4.1 选择合适的策略
技术文档: 使用基于标题的分割
学术论文: 使用段落分割
对话数据: 使用句子分割
长篇文章: 使用递归分割
多语言内容: 使用语义感知分割
5.4.2 参数调优
def optimize_chunking_parameters(documents, target_retrieval_score=0.8):
"""
优化分割参数
:param documents: 文档集合
:param target_retrieval_score: 目标检索分数
:return: 最优参数组合
"""
best_params = None
best_score = 0
# 测试不同的参数组合
chunk_sizes = [256, 512, 768, 1024]
overlap_ratios = [0.1, 0.2, 0.3]
for chunk_size in chunk_sizes:
for overlap_ratio in overlap_ratios:
overlap = int(chunk_size * overlap_ratio)
# 评估当前参数的性能
score = evaluate_chunking_performance(
documents, chunk_size, overlap
)
if score > best_score:
best_score = score
best_params = {
'chunk_size': chunk_size,
'overlap': overlap,
'score': score
}
return best_params
5.4.3 质量评估
def evaluate_chunking_quality(chunks):
"""
评估分割质量
:param chunks: 分割后的文本块
:return: 质量指标
"""
metrics = {
'chunk_count': len(chunks),
'avg_chunk_length': np.mean([len(chunk.split()) for chunk in chunks]),
'length_variance': np.var([len(chunk.split()) for chunk in chunks]),
'min_length': min(len(chunk.split()) for chunk in chunks),
'max_length': max(len(chunk.split()) for chunk in chunks)
}
# 计算语义连贯性(简化版)
coherent_chunks = 0
for chunk in chunks:
sentences = sent_tokenize(chunk)
if len(sentences) <= 1 or sentences[-1].strip().endswith(('.', '!', '?')):
coherent_chunks += 1
metrics['coherence_ratio'] = coherent_chunks / len(chunks)
return metrics
5.5 最佳实践
- 保持语义完整性: 优先选择基于语义单位的分割策略
- 合理设置重叠: 通常设置为块大小的10-20%
- 考虑应用场景: 根据具体需求选择合适的分割粒度
- 动态调整: 根据文档类型自动选择分割策略
- 质量监控: 定期评估分割效果并进行优化
通过合理选择和实现文档分割策略,可以显著提高RAG系统的检索精度和回答质量。
六、主流嵌入模型排行榜
2024年文本嵌入模型排行榜(基于MTEB评测)
🏆 国际顶级模型(前5名)
1. text-embedding-3-large (OpenAI)
- MTEB Score: 64.6
- 特点: 支持可变维度输出,最大3072维
- 优势: 多语言支持优秀,语义理解能力强
- 适用场景: 企业应用、多语言环境
2. e5-mistral-7b-instruct (Microsoft)
- MTEB Score: 64.2
- 特点: 基于Mistral 7B,指令优化
- 优势: 开源模型,推理效率高
- 适用场景: 需要本地部署的场景
3. gte-large-en-v1.5 (Alibaba)
- MTEB Score: 63.8
- 特点: 专门针对英文优化的通用文本编码器
- 优势: 在英文任务上表现优异,开源
- 适用场景: 英文文本处理、检索增强
4. bge-large-en-v1.5 (BAAI)
- MTEB Score: 63.5
- 特点: 北京智源研究院开发,大规模训练
- 优势: 平衡性能和效率,开源免费
- 适用场景: 通用文本嵌入、学术研究
5. text-embedding-ada-002 (OpenAI)
- MTEB Score: 60.0
- 特点: 经典模型,稳定可靠
- 优势: API调用简单,成本相对较低
- 适用场景: 快速原型开发、中小规模应用
🥇 国内优秀模型(前5名)
1. bge-large-zh-v1.5 (BAAI)
- MTEB Score: 62.4(中文任务)
- 特点: 专门针对中文优化的嵌入模型
- 优势: 中文语义理解能力最强
- 适用场景: 中文问答、文档检索
2. m3e-base (LangChain)
- MTEB Score: 61.8(中文任务)
- 特点: 多语言混合训练,支持中英日韩
- 优势: 跨语言对齐效果好
- 适用场景: 多语言应用、国际化产品
3. text2vec-large-chinese (HuggingFace)
- MTEB Score: 60.5(中文任务)
- 特点: 基于ERNIE架构,中文优化
- 优势: 中文长文本处理能力优秀
- 适用场景: 中文文档处理、知识库构建
4. gte-base-zh-v1.5 (Alibaba)
- MTEB Score: 59.8(中文任务)
- 特点: 阿里巴巴通义千问系列
- 优势: 商业应用成熟,支持度高
- 适用场景: 电商、金融领域应用
5. uer-sbert-base-chinese (百度)
- MTEB Score: 58.9(中文任务)
- 特点: 基于ERNIE预训练,Sentence-BERT架构
- 优势: 句子级别语义匹配优秀
- 适用场景: 文本相似度、问答系统
📊 综合排行榜(Top 10)
| 排名 | 模型名称 | 发布者 | MTEB分数 | 类型 | 开源 |
|---|---|---|---|---|---|
| 1 | text-embedding-3-large | OpenAI | 64.6 | 商业 | ❌ |
| 2 | e5-mistral-7b-instruct | Microsoft | 64.2 | 开源 | ✅ |
| 3 | gte-large-en-v1.5 | Alibaba | 63.8 | 开源 | ✅ |
| 4 | bge-large-en-v1.5 | BAAI | 63.5 | 开源 | ✅ |
| 5 | bge-large-zh-v1.5 | BAAI | 62.4 | 开源 | ✅ |
| 6 | m3e-base | LangChain | 61.8 | 开源 | ✅ |
| 7 | text2vec-large-chinese | HuggingFace | 60.5 | 开源 | ✅ |
| 8 | text-embedding-ada-002 | OpenAI | 60.0 | 商业 | ❌ |
| 9 | gte-base-zh-v1.5 | Alibaba | 59.8 | 开源 | ✅ |
| 10 | uer-sbert-base-chinese | 百度 | 58.9 | 开源 | ✅ |
💡 选择建议
企业级应用
- 首选: text-embedding-3-large(商业API)
- 备选: e5-mistral-7b-instruct(开源可商用)
中文项目
- 首选: bge-large-zh-v1.5
- 备选: text2vec-large-chinese
开源项目
- 英文: bge-large-en-v1.5
- 多语言: m3e-base
成本敏感
- 英文: gte-base-en-v1.5
- 中文: uer-sbert-base-chinese
七、LangGraph中的RAG实现
LangGraph是一个基于LangChain的图状工作流框架,特别适合实现复杂的RAG应用。它通过定义节点和边来构建有向图,实现灵活的检索增强生成流程。
7.1 LangGraph RAG实现概述
7.1.1 什么是LangGraph?
LangGraph是LangChain生态系统中的一个组件,用于构建状态机驱动的应用程序。它将复杂的AI工作流表示为有向图,其中:
- 节点(Nodes): 执行特定任务的函数
- 边(Edges): 定义节点之间的连接和流转逻辑
- 状态(State): 在节点间传递的数据
7.1.2 LangGraph RAG的优势
- 可视化工作流: 清晰展示RAG的各个阶段
- 灵活控制: 可以根据条件动态调整执行路径
- 错误处理: 内置的错误处理和重试机制
- 可扩展性: 易于添加新的处理步骤
- 状态管理: 自动管理中间状态和上下文
7.2 LangGraph核心概念
7.2.1 基本组件
from typing import TypedDict, Annotated, Sequence
import operator
from langgraph.graph import Graph, StateGraph
from langchain_core.messages import BaseMessage
# 定义状态
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
query: str
context: str
answer: str
next_action: str
7.2.2 节点类型
- 检索节点: 负责从向量库中检索相关文档
- 生成节点: 基于检索结果生成回答
- 评估节点: 评估回答质量
- 路由节点: 决定下一步执行路径
7.3 基础RAG工作流实现
7.3.1 基础RAG图结构
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnablePassthrough
class RAGWorkflow:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
self.workflow = None
def setup_workflow(self):
"""设置RAG工作流"""
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("retrieve", self.retrieve_documents)
workflow.add_node("generate", self.generate_answer)
workflow.add_node("evaluate", self.evaluate_answer)
# 设置入口点
workflow.set_entry_point("retrieve")
# 添加边
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "evaluate")
# 条件边:根据评估结果决定是否结束或重试
workflow.add_conditional_edges(
"evaluate",
self.should_continue,
{
"end": END,
"retry": "generate"
}
)
self.workflow = workflow.compile()
return self.workflow
7.3.2 节点实现
def retrieve_documents(self, state: AgentState) -> AgentState:
"""检索相关文档"""
query = state["query"]
# 相似度检索
docs = self.vectorstore.similarity_search(query, k=3)
# 提取文档内容
context = "\n\n".join([doc.page_content for doc in docs])
return {
**state,
"context": context,
"next_action": "generate"
}
def generate_answer(self, state: AgentState) -> AgentState:
"""生成回答"""
query = state["query"]
context = state["context"]
# 构建提示
prompt = f"""基于以下信息回答问题:
背景信息:{context}
问题:{query}
请给出准确、详细的回答:"""
# 生成回答
response = self.llm.invoke(prompt)
return {
**state,
"answer": response.content,
"next_action": "evaluate"
}
def evaluate_answer(self, state: AgentState) -> AgentState:
"""评估回答质量"""
query = state["query"]
context = state["context"]
answer = state["answer"]
# 评估提示
evaluation_prompt = f"""评估以下回答的质量:
问题:{query}
背景信息:{context}
回答:{answer}
请评估:
1. 回答是否准确回答了问题
2. 回答是否基于提供的背景信息
3. 回答是否完整且详细
如果回答质量良好,返回"good",否则返回"poor":"""
evaluation = self.llm.invoke(evaluation_prompt)
return {
**state,
"evaluation": evaluation.content,
"next_action": "end" if "good" in evaluation.content.lower() else "retry"
}
def should_continue(self, state: AgentState) -> str:
"""决定是否继续"""
return state["next_action"]
7.4 高级RAG实现
7.4.1 多轮检索RAG
class MultiRoundRAGWorkflow:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
self.max_rounds = 3
def setup_workflow(self):
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("query_analysis", self.analyze_query)
workflow.add_node("retrieve", self.retrieve_documents)
workflow.add_node("assess_relevance", self.assess_relevance)
workflow.add_node("refine_query", self.refine_query)
workflow.add_node("generate", self.generate_answer)
workflow.add_node("evaluate", self.evaluate_answer)
# 设置入口点
workflow.set_entry_point("query_analysis")
# 添加边
workflow.add_edge("query_analysis", "retrieve")
workflow.add_edge("retrieve", "assess_relevance")
workflow.add_conditional_edges(
"assess_relevance",
self.needs_refinement,
{
"refine": "refine_query",
"generate": "generate"
}
)
workflow.add_edge("refine_query", "retrieve")
workflow.add_edge("generate", "evaluate")
workflow.add_conditional_edges(
"evaluate",
self.should_continue,
{
"end": END,
"retry": "retrieve"
}
)
return workflow.compile()
7.4.2 自问自答RAG(Self-Ask RAG)
class SelfAskRAGWorkflow:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def setup_workflow(self):
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("decompose_question", self.decompose_question)
workflow.add_node("answer_subquestion", self.answer_subquestion)
workflow.add_node("synthesize_answer", self.synthesize_answer)
# 设置入口点
workflow.set_entry_point("decompose_question")
# 添加边
workflow.add_edge("decompose_question", "answer_subquestion")
workflow.add_conditional_edges(
"answer_subquestion",
self.has_more_subquestions,
{
"continue": "answer_subquestion",
"synthesize": "synthesize_answer"
}
)
workflow.add_edge("synthesize_answer", END)
return workflow.compile()
7.5 完整代码示例
# 完整的LangGraph RAG实现
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
class CompleteRAGSystem:
def __init__(self, documents_path, openai_api_key):
self.openai_api_key = openai_api_key
self.setup_components(documents_path)
self.setup_workflow()
def setup_components(self, documents_path):
"""设置RAG组件"""
# 加载文档
loader = TextLoader(documents_path)
documents = loader.load()
# 文档分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.split_documents(documents)
# 向量化
embeddings = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
self.vectorstore = Chroma.from_documents(chunks, embeddings)
# 语言模型
self.llm = ChatOpenAI(
temperature=0,
openai_api_key=self.openai_api_key
)
def setup_workflow(self):
"""设置工作流"""
self.rag_workflow = MultiRoundRAGWorkflow(
self.vectorstore, self.llm
)
self.workflow = self.rag_workflow.setup_workflow()
def query(self, question):
"""执行查询"""
initial_state = {
"query": question,
"context": "",
"answer": "",
"next_action": ""
}
result = self.workflow.invoke(initial_state)
return result["answer"]
7.6 LangGraph RAG最佳实践
7.6.1 性能优化
- 缓存机制: 缓存中间结果避免重复计算
- 并行处理: 对独立子任务使用并行执行
- 结果缓存: 缓存常见查询的答案
7.6.2 错误处理
def safe_invoke_node(func):
"""安全执行节点的装饰器"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"节点执行错误: {e}")
# 返回安全状态或重试逻辑
return {"error": str(e), "next_action": "retry"}
return wrapper
7.6.3 监控和调试
class MonitoringRAGWorkflow:
def __init__(self):
self.execution_log = []
def log_execution(self, node_name, input_state, output_state):
"""记录执行日志"""
self.execution_log.append({
"node": node_name,
"input": input_state,
"output": output_state,
"timestamp": datetime.now()
})
LangGraph为RAG应用提供了强大的图状工作流支持,使得复杂的检索增强生成逻辑变得更加清晰、可控和可扩展。通过合理设计节点和流转逻辑,可以构建出高效、可靠的RAG系统。
总结
RAG技术通过将检索和生成相结合,有效解决了大语言模型的局限性,提供了更准确、可靠的信息服务。其工作流程清晰,实现相对简单,适用于多种应用场景。随着技术的不断发展,RAG将在更多领域发挥重要作用,成为构建智能问答系统的核心技术之一。
关键要点:
- RAG = 检索 + 增强 + 生成
- 工作流程包括文档处理和查询处理两个阶段
- 技术栈成熟,有多个开源框架支持
- 适用于需要准确、最新信息的场景
- 具备良好的可扩展性和可控性
- 文档分割: 选择合适的分割策略对RAG性能至关重要
- 嵌入模型选择: 根据具体需求选择合适的嵌入模型,平衡性能、成本和部署便利性
- LangGraph: 提供强大的图状工作流支持,实现复杂的RAG应用逻辑
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)