RAG技术详解

目录


一、RAG技术概述

RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合了信息检索和语言模型生成的混合AI技术。它通过在生成回答之前,从外部知识库中检索相关信息来增强语言模型的回答质量。

核心概念

RAG的核心思想

  • 检索(Retrieval):从大规模文档库中找到与问题相关的信息片段
  • 增强(Augmented):将检索到的相关信息作为上下文提供给语言模型
  • 生成(Generation):语言模型基于检索到的信息生成更准确、更丰富的回答

为什么需要RAG

  • 解决大语言模型的"幻觉"问题
  • 提供最新、准确的信息
  • 减少模型训练成本
  • 实现知识库的动态更新

二、RAG工作流程

1. 整体架构

RAG系统通常包含以下核心组件:

用户查询 → 查询编码 → 文档检索 → 结果重排序 → 上下文构建 → 回答生成 → 输出生成

2. 详细工作流程

阶段一:文档处理(离线)
  1. 文档分割(Chunking)

    • 将大文档切分成小块(通常512-1024 tokens)
    • 考虑语义连贯性,避免在关键信息处断开
    • 详见第五章:文档分割的实现
  2. 向量化(Embedding)

    • 使用嵌入模型将文本转换为向量
    • 常用模型:text-embedding-ada-002、bge-large等
  3. 向量存储

    • 将向量存储在向量数据库中
    • 支持高效的相似度搜索
阶段二:查询处理(在线)
  1. 查询编码

    • 将用户问题转换为向量表示
    • 与文档块使用相同的嵌入模型
  2. 相似度检索

    • 在向量数据库中查找最相关的文档块
    • 通常返回Top-K个结果(K=3-10)
  3. 重排序(可选)

    • 使用交叉编码器对检索结果重新排序
    • 提高检索准确性
  4. 上下文构建

    • 将检索到的文档块与原始查询组合
    • 控制总长度,避免超出模型限制
  5. 回答生成

    • 使用语言模型基于增强的上下文生成回答
    • 可以采用不同的提示模板

3. 工作流程图

用户查询

查询编码

向量数据库

文档库

文档分块

向量化

相似度检索

结果重排序

上下文构建

回答生成

最终回答


三、具体实现方案

1. 技术栈选择

核心组件
  • 嵌入模型:OpenAI text-embedding-ada-002、BGE系列、Sentence-BERT
  • 向量数据库:Pinecone、Weaviate、Chroma、FAISS、Milvus
  • 语言模型:GPT-3.5/4、Claude、Llama 2/3、ChatGLM
  • 框架:LangChain、LlamaIndex

2. 代码实现示例

基础RAG实现(Python)
import openai
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA

# 1. 文档处理和向量存储
def setup_rag_system(documents):
    # 文档分块
    text_splitter = CharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200
    )
    chunks = text_splitter.split_documents(documents)

    # 向量化
    embeddings = OpenAIEmbeddings()

    # 向量存储
    vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings
    )

    return vectorstore

# 2. RAG查询处理
def rag_query(vectorstore, query):
    # 创建检索器
    retriever = vectorstore.as_retriever(
        search_kwargs={"k": 3}
    )

    # 创建RAG链
    qa_chain = RetrievalQA.from_chain_type(
        llm=OpenAI(temperature=0),
        chain_type="stuff",
        retriever=retriever
    )

    # 执行查询
    result = qa_chain({"query": query})
    return result["result"]
高级实现(带重排序)
from sentence_transformers import CrossEncoder

class AdvancedRAG:
    def __init__(self, vectorstore, cross_encoder_model):
        self.vectorstore = vectorstore
        self.cross_encoder = CrossEncoder(cross_encoder_model)

    def rerank_results(self, query, retrieved_docs):
        # 构建查询-文档对
        query_doc_pairs = [(query, doc.page_content) for doc in retrieved_docs]

        # 重排序
        scores = self.cross_encoder.predict(query_doc_pairs)

        # 按分数排序
        reranked_docs = [doc for _, doc in sorted(
            zip(scores, retrieved_docs),
            key=lambda x: x[0],
            reverse=True
        )]

        return reranked_docs

    def query(self, question):
        # 初步检索
        docs = self.vectorstore.similarity_search(question, k=10)

        # 重排序
        reranked_docs = self.rerank_results(question, docs)

        # 构建上下文
        context = "\n\n".join([doc.page_content for doc in reranked_docs[:3]])

        # 生成回答
        prompt = f"""基于以下信息回答问题:

        背景信息:{context}

        问题:{question}

        请给出准确、详细的回答:"""

        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )

        return response.choices[0].message.content

3. 部署架构

简单部署
前端 → API服务 → RAG引擎 → 向量数据库
                      ↓
                 语言模型API
生产级部署
负载均衡器 → 多个API实例 → RAG服务集群
                             ↓
                    Redis缓存层
                             ↓
                   向量数据库集群
                             ↓
                   语言模型服务

4. 性能优化

缓存策略
import redis
import hashlib

class CachedRAG:
    def __init__(self, rag_system, redis_client):
        self.rag_system = rag_system
        self.redis = redis_client

    def get_cache_key(self, query):
        return hashlib.md5(query.encode()).hexdigest()

    def query(self, question):
        cache_key = self.get_cache_key(question)

        # 检查缓存
        cached_result = self.redis.get(cache_key)
        if cached_result:
            return cached_result.decode()

        # 执行RAG查询
        result = self.rag_system.query(question)

        # 缓存结果
        self.redis.setex(cache_key, 3600, result)  # 缓存1小时

        return result
批处理和异步
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def batch_rag_queries(queries, rag_system):
    async def process_query(query):
        loop = asyncio.get_event_loop()
        with ThreadPoolExecutor() as executor:
            result = await loop.run_in_executor(
                executor,
                rag_system.query,
                query
            )
        return result

    tasks = [process_query(query) for query in queries]
    results = await asyncio.gather(*tasks)
    return results

四、应用场景和优势

1. 主要应用场景

企业知识库问答
  • 场景:员工查询公司政策、流程文档
  • 优势:准确、一致,避免信息过时
  • 案例:HR政策问答、技术文档查询
客服系统
  • 场景:产品支持、故障排查
  • 优势:基于最新产品文档提供准确答案
  • 案例:技术产品客服、金融服务咨询
医疗健康
  • 场景:医学文献查询、诊断辅助
  • 优势:基于权威医学资料提供信息
  • 案例:临床决策支持、医学研究
法律合规
  • 场景:法律条文查询、合规检查
  • 优势:确保信息准确性和时效性
  • 案例:法律研究、合规审查
教育培训
  • 场景:课程答疑、学习材料查询
  • 优势:基于教材内容提供准确解释
  • 案例:在线学习平台、企业培训

2. RAG的优势

准确性提升
  • 减少幻觉:基于真实文档生成回答
  • 事实核查:可追溯到原始信息来源
  • 时效性强:知识库可随时更新
成本效益
  • 降低训练成本:无需重新训练大模型
  • 灵活更新:知识库独立于模型更新
  • 资源优化:可按需检索,减少计算开销
可控性和透明度
  • 内容控制:可限制检索源和生成内容
  • 审计追踪:可追踪回答的信息来源
  • 质量保证:可实施质量过滤和验证
可扩展性
  • 多源整合:可整合多个知识源
  • 领域适配:易于适配不同专业领域
  • 规模弹性:支持从小规模到大规模部署

3. 挑战和解决方案

检索质量
  • 挑战:检索到不相关或低质量文档
  • 解决方案
    • 优化文档分块策略
    • 使用重排序模型
    • 多阶段检索(粗筛+精筛)
上下文长度限制
  • 挑战:大模型上下文窗口限制
  • 解决方案
    • 智能文档摘要
    • 分层检索策略
    • 动态上下文选择
性能和延迟
  • 挑战:检索和生成的延迟
  • 解决方案
    • 结果缓存
    • 异步处理
    • 向量数据库优化
知识更新
  • 挑战:保持知识库的时效性
  • 解决方案
    • 自动化文档监控
    • 增量更新机制
    • 版本控制和回滚

4. 未来发展趋势

  1. 多模态RAG:支持图像、音频等多媒体内容
  2. 自适应RAG:根据查询类型自动调整检索策略
  3. 联邦RAG:跨多个数据源的隐私保护检索
  4. 实时RAG:支持流式数据实时检索和更新

五、文档分割的实现

文档分割(Document Chunking)是RAG系统中的关键环节,直接影响检索质量和最终回答的准确性。

5.1 文档分割的重要性

为什么需要文档分割?
  1. 模型限制: 大语言模型的上下文窗口有限(通常4K-32K tokens)
  2. 检索精度: 小块文档更容易实现精准匹配
  3. 计算效率: 减少不必要的计算开销
  4. 语义完整: 保持文本片段的语义连贯性
分割不当的后果
  • 关键信息被截断,影响语义理解
  • 检索到不相关的内容,降低回答质量
  • 重要上下文丢失,导致回答不完整

5.2 常见分割策略

5.2.1 固定长度分割

原理: 按照固定的token或字符数进行分割

优点: 简单高效,易于实现
缺点: 可能切断语义连贯的句子

def fixed_size_chunking(text, chunk_size=512, overlap=50):
    """
    固定长度文档分割
    :param text: 输入文本
    :param chunk_size: 每个块的大小
    :param overlap: 重叠部分大小
    :return: 分割后的文本块列表
    """
    chunks = []
    start = 0
    
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)
        
        # 移动起始位置,考虑重叠
        start = end - overlap
        
        # 避免无限循环
        if start >= len(text):
            break
    
    return chunks
5.2.2 基于句子的分割

原理: 以句子为基本单位进行分割

优点: 保持句子完整性,语义更连贯
缺点: 单句可能过长,需要二次处理

import nltk
from nltk.tokenize import sent_tokenize

def sentence_based_chunking(text, max_tokens=512, overlap_sentences=1):
    """
    基于句子的文档分割
    :param text: 输入文本
    :param max_tokens: 最大token数
    :param overlap_sentences: 重叠句子数
    :return: 分割后的文本块列表
    """
    sentences = sent_tokenize(text)
    chunks = []
    current_chunk = []
    current_tokens = 0
    
    for i, sentence in enumerate(sentences):
        sentence_tokens = len(sentence.split())
        
        # 如果当前句子单独就超过限制,需要特殊处理
        if sentence_tokens > max_tokens:
            # 如果当前块不为空,先保存
            if current_chunk:
                chunks.append(" ".join(current_chunk))
                current_chunk = []
                current_tokens = 0
            
            # 对长句子进行进一步分割
            sub_chunks = fixed_size_chunking(sentence, max_tokens, max_tokens // 4)
            chunks.extend(sub_chunks)
            continue
        
        # 如果添加当前句子会超出限制
        if current_tokens + sentence_tokens > max_tokens and current_chunk:
            chunks.append(" ".join(current_chunk))
            
            # 保留重叠的句子
            overlap_start = max(0, len(current_chunk) - overlap_sentences)
            current_chunk = current_chunk[overlap_start:]
            current_tokens = sum(len(s.split()) for s in current_chunk)
        
        current_chunk.append(sentence)
        current_tokens += sentence_tokens
    
    # 添加最后一个块
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    
    return chunks
5.2.3 基于段落的分割

原理: 以自然段落为单位进行分割

优点: 保持段落语义完整性
缺点: 段落长度差异大

def paragraph_based_chunking(text, max_tokens=512, overlap_paragraphs=1):
    """
    基于段落的文档分割
    :param text: 输入文本
    :param max_tokens: 最大token数
    :param overlap_paragraphs: 重叠段落数
    :return: 分割后的文本块列表
    """
    # 按段落分割(空行分隔)
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    chunks = []
    current_chunk = []
    current_tokens = 0
    
    for i, paragraph in enumerate(paragraphs):
        paragraph_tokens = len(paragraph.split())
        
        # 如果当前段落单独就超过限制
        if paragraph_tokens > max_tokens:
            # 如果当前块不为空,先保存
            if current_chunk:
                chunks.append("\n\n".join(current_chunk))
                current_chunk = []
                current_tokens = 0
            
            # 对长段落进行句子级分割
            sub_chunks = sentence_based_chunking(paragraph, max_tokens, 1)
            chunks.extend(sub_chunks)
            continue
        
        # 如果添加当前段落会超出限制
        if current_tokens + paragraph_tokens > max_tokens and current_chunk:
            chunks.append("\n\n".join(current_chunk))
            
            # 保留重叠的段落
            overlap_start = max(0, len(current_chunk) - overlap_paragraphs)
            current_chunk = current_chunk[overlap_start:]
            current_tokens = sum(len(p.split()) for p in current_chunk)
        
        current_chunk.append(paragraph)
        current_tokens += paragraph_tokens
    
    # 添加最后一个块
    if current_chunk:
        chunks.append("\n\n".join(current_chunk))
    
    return chunks
5.2.4 语义感知分割

原理: 基于文本语义结构进行智能分割

优点: 保持语义完整性,提高检索质量
缺点: 实现复杂,计算开销大

from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticChunker:
    def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
    
    def get_embeddings(self, texts):
        """获取文本的嵌入向量"""
        inputs = self.tokenizer(texts, padding=True, truncation=True, 
                              return_tensors='pt', max_length=512)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            embeddings = outputs.last_hidden_state[:, 0, :].numpy()
        
        return embeddings
    
    def semantic_chunking(self, text, max_tokens=512, similarity_threshold=0.7):
        """
        语义感知的文档分割
        :param text: 输入文本
        :param max_tokens: 最大token数
        :param similarity_threshold: 语义相似度阈值
        :return: 分割后的文本块列表
        """
        sentences = sent_tokenize(text)
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for i, sentence in enumerate(sentences):
            sentence_tokens = len(sentence.split())
            
            # 如果当前句子单独就超过限制
            if sentence_tokens > max_tokens:
                if current_chunk:
                    chunks.append(" ".join(current_chunk))
                    current_chunk = []
                    current_tokens = 0
                
                # 对长句子进行固定长度分割
                sub_chunks = fixed_size_chunking(sentence, max_tokens, max_tokens // 4)
                chunks.extend(sub_chunks)
                continue
            
            # 检查语义相似度
            should_break = False
            if len(current_chunk) >= 2:  # 至少有2个句子才检查
                current_text = " ".join(current_chunk[-2:])  # 取最后两个句子
                combined_text = current_text + " " + sentence
                
                embeddings = self.get_embeddings([current_text, sentence])
                similarity = cosine_similarity(
                    embeddings[0].reshape(1, -1),
                    embeddings[1].reshape(1, -1)
                )[0][0]
                
                # 如果相似度太低,建议在此处分隔
                if similarity < similarity_threshold:
                    should_break = True
            
            # 如果添加当前句子会超出限制或语义不连贯
            if (current_tokens + sentence_tokens > max_tokens or should_break) and current_chunk:
                chunks.append(" ".join(current_chunk))
                current_chunk = []
                current_tokens = 0
            
            current_chunk.append(sentence)
            current_tokens += sentence_tokens
        
        # 添加最后一个块
        if current_chunk:
            chunks.append(" ".join(current_chunk))
        
        return chunks
5.2.5 基于标题层次的分割

原理: 利用文档的标题结构进行分割

优点: 符合文档自然结构,便于导航
缺点: 需要文档有清晰的标题结构

import re

def heading_based_chunking(text, max_tokens=512):
    """
    基于标题层次的文档分割
    :param text: 输入文本
    :param max_tokens: 最大token数
    :return: 分割后的文本块列表
    """
    # 匹配常见的标题格式
    heading_patterns = [
        r'^#+\s+.+$',  # Markdown标题
        r'^\d+\.\s+.+$',  # 数字编号
        r'^[A-Z][^.!?]*[:.]',  # 大写字母开头
    ]
    
    lines = text.split('\n')
    chunks = []
    current_chunk = []
    current_tokens = 0
    
    for line in lines:
        line = line.strip()
        if not line:
            continue
        
        # 检查是否是标题
        is_heading = any(re.match(pattern, line, re.MULTILINE) for pattern in heading_patterns)
        line_tokens = len(line.split())
        
        # 如果是标题且当前块不为空,保存当前块
        if is_heading and current_chunk and current_tokens > 0:
            chunks.append("\n".join(current_chunk))
            current_chunk = []
            current_tokens = 0
        
        # 如果添加当前行会超出限制
        if current_tokens + line_tokens > max_tokens and current_chunk:
            chunks.append("\n".join(current_chunk))
            current_chunk = []
            current_tokens = 0
        
        current_chunk.append(line)
        current_tokens += line_tokens
    
    # 添加最后一个块
    if current_chunk:
        chunks.append("\n".join(current_chunk))
    
    return chunks

5.3 高级分割策略

5.3.1 递归分割
class RecursiveChunker:
    def __init__(self, chunk_sizes=[1000, 500, 250]):
        self.chunk_sizes = chunk_sizes
    
    def recursive_chunk(self, text, level=0):
        """
        递归文档分割
        :param text: 输入文本
        :param level: 当前递归层级
        :return: 分割后的文本块列表
        """
        if level >= len(self.chunk_sizes):
            return [text]
        
        chunk_size = self.chunk_sizes[level]
        
        # 尝试当前层级的分割策略
        if level == 0:
            chunks = paragraph_based_chunking(text, chunk_size)
        elif level == 1:
            chunks = sentence_based_chunking(text, chunk_size)
        else:
            chunks = fixed_size_chunking(text, chunk_size)
        
        # 如果块的数量没有减少,进入下一层级
        if len(chunks) <= 1 and level < len(self.chunk_sizes) - 1:
            return self.recursive_chunk(text, level + 1)
        
        # 递归处理每个块
        final_chunks = []
        for chunk in chunks:
            if len(chunk.split()) > self.chunk_sizes[-1]:
                sub_chunks = self.recursive_chunk(chunk, level + 1)
                final_chunks.extend(sub_chunks)
            else:
                final_chunks.append(chunk)
        
        return final_chunks
5.3.2 智能分割
class IntelligentChunker:
    def __init__(self, max_tokens=512):
        self.max_tokens = max_tokens
    
    def analyze_document_structure(self, text):
        """分析文档结构,选择最佳分割策略"""
        lines = text.split('\n')
        
        # 检查是否有标题结构
        has_headings = any(line.strip().startswith('#') for line in lines)
        
        # 检查段落长度分布
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        avg_paragraph_length = sum(len(p.split()) for p in paragraphs) / len(paragraphs) if paragraphs else 0
        
        # 检查句子长度分布
        sentences = sent_tokenize(text)
        avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0
        
        return {
            'has_headings': has_headings,
            'avg_paragraph_length': avg_paragraph_length,
            'avg_sentence_length': avg_sentence_length,
            'paragraph_count': len(paragraphs),
            'sentence_count': len(sentences)
        }
    
    def intelligent_chunk(self, text):
        """智能选择分割策略"""
        structure = self.analyze_document_structure(text)
        
        # 如果有清晰的标题结构,使用标题分割
        if structure['has_headings']:
            return heading_based_chunking(text, self.max_tokens)
        
        # 如果段落平均长度适中,使用段落分割
        elif structure['avg_paragraph_length'] <= self.max_tokens * 0.8:
            return paragraph_based_chunking(text, self.max_tokens)
        
        # 如果句子较短,使用句子分割
        elif structure['avg_sentence_length'] <= self.max_tokens * 0.6:
            return sentence_based_chunking(text, self.max_tokens)
        
        # 否则使用递归分割
        else:
            recursive_chunker = RecursiveChunker([self.max_tokens, self.max_tokens//2, self.max_tokens//4])
            return recursive_chunker.recursive_chunk(text)

5.4 实际应用建议

5.4.1 选择合适的策略

技术文档: 使用基于标题的分割
学术论文: 使用段落分割
对话数据: 使用句子分割
长篇文章: 使用递归分割
多语言内容: 使用语义感知分割

5.4.2 参数调优
def optimize_chunking_parameters(documents, target_retrieval_score=0.8):
    """
    优化分割参数
    :param documents: 文档集合
    :param target_retrieval_score: 目标检索分数
    :return: 最优参数组合
    """
    best_params = None
    best_score = 0
    
    # 测试不同的参数组合
    chunk_sizes = [256, 512, 768, 1024]
    overlap_ratios = [0.1, 0.2, 0.3]
    
    for chunk_size in chunk_sizes:
        for overlap_ratio in overlap_ratios:
            overlap = int(chunk_size * overlap_ratio)
            
            # 评估当前参数的性能
            score = evaluate_chunking_performance(
                documents, chunk_size, overlap
            )
            
            if score > best_score:
                best_score = score
                best_params = {
                    'chunk_size': chunk_size,
                    'overlap': overlap,
                    'score': score
                }
    
    return best_params
5.4.3 质量评估
def evaluate_chunking_quality(chunks):
    """
    评估分割质量
    :param chunks: 分割后的文本块
    :return: 质量指标
    """
    metrics = {
        'chunk_count': len(chunks),
        'avg_chunk_length': np.mean([len(chunk.split()) for chunk in chunks]),
        'length_variance': np.var([len(chunk.split()) for chunk in chunks]),
        'min_length': min(len(chunk.split()) for chunk in chunks),
        'max_length': max(len(chunk.split()) for chunk in chunks)
    }
    
    # 计算语义连贯性(简化版)
    coherent_chunks = 0
    for chunk in chunks:
        sentences = sent_tokenize(chunk)
        if len(sentences) <= 1 or sentences[-1].strip().endswith(('.', '!', '?')):
            coherent_chunks += 1
    
    metrics['coherence_ratio'] = coherent_chunks / len(chunks)
    
    return metrics

5.5 最佳实践

  1. 保持语义完整性: 优先选择基于语义单位的分割策略
  2. 合理设置重叠: 通常设置为块大小的10-20%
  3. 考虑应用场景: 根据具体需求选择合适的分割粒度
  4. 动态调整: 根据文档类型自动选择分割策略
  5. 质量监控: 定期评估分割效果并进行优化

通过合理选择和实现文档分割策略,可以显著提高RAG系统的检索精度和回答质量。


六、主流嵌入模型排行榜

2024年文本嵌入模型排行榜(基于MTEB评测)

🏆 国际顶级模型(前5名)
1. text-embedding-3-large (OpenAI)
  • MTEB Score: 64.6
  • 特点: 支持可变维度输出,最大3072维
  • 优势: 多语言支持优秀,语义理解能力强
  • 适用场景: 企业应用、多语言环境
2. e5-mistral-7b-instruct (Microsoft)
  • MTEB Score: 64.2
  • 特点: 基于Mistral 7B,指令优化
  • 优势: 开源模型,推理效率高
  • 适用场景: 需要本地部署的场景
3. gte-large-en-v1.5 (Alibaba)
  • MTEB Score: 63.8
  • 特点: 专门针对英文优化的通用文本编码器
  • 优势: 在英文任务上表现优异,开源
  • 适用场景: 英文文本处理、检索增强
4. bge-large-en-v1.5 (BAAI)
  • MTEB Score: 63.5
  • 特点: 北京智源研究院开发,大规模训练
  • 优势: 平衡性能和效率,开源免费
  • 适用场景: 通用文本嵌入、学术研究
5. text-embedding-ada-002 (OpenAI)
  • MTEB Score: 60.0
  • 特点: 经典模型,稳定可靠
  • 优势: API调用简单,成本相对较低
  • 适用场景: 快速原型开发、中小规模应用
🥇 国内优秀模型(前5名)
1. bge-large-zh-v1.5 (BAAI)
  • MTEB Score: 62.4(中文任务)
  • 特点: 专门针对中文优化的嵌入模型
  • 优势: 中文语义理解能力最强
  • 适用场景: 中文问答、文档检索
2. m3e-base (LangChain)
  • MTEB Score: 61.8(中文任务)
  • 特点: 多语言混合训练,支持中英日韩
  • 优势: 跨语言对齐效果好
  • 适用场景: 多语言应用、国际化产品
3. text2vec-large-chinese (HuggingFace)
  • MTEB Score: 60.5(中文任务)
  • 特点: 基于ERNIE架构,中文优化
  • 优势: 中文长文本处理能力优秀
  • 适用场景: 中文文档处理、知识库构建
4. gte-base-zh-v1.5 (Alibaba)
  • MTEB Score: 59.8(中文任务)
  • 特点: 阿里巴巴通义千问系列
  • 优势: 商业应用成熟,支持度高
  • 适用场景: 电商、金融领域应用
5. uer-sbert-base-chinese (百度)
  • MTEB Score: 58.9(中文任务)
  • 特点: 基于ERNIE预训练,Sentence-BERT架构
  • 优势: 句子级别语义匹配优秀
  • 适用场景: 文本相似度、问答系统
📊 综合排行榜(Top 10)
排名 模型名称 发布者 MTEB分数 类型 开源
1 text-embedding-3-large OpenAI 64.6 商业
2 e5-mistral-7b-instruct Microsoft 64.2 开源
3 gte-large-en-v1.5 Alibaba 63.8 开源
4 bge-large-en-v1.5 BAAI 63.5 开源
5 bge-large-zh-v1.5 BAAI 62.4 开源
6 m3e-base LangChain 61.8 开源
7 text2vec-large-chinese HuggingFace 60.5 开源
8 text-embedding-ada-002 OpenAI 60.0 商业
9 gte-base-zh-v1.5 Alibaba 59.8 开源
10 uer-sbert-base-chinese 百度 58.9 开源
💡 选择建议

企业级应用

  • 首选: text-embedding-3-large(商业API)
  • 备选: e5-mistral-7b-instruct(开源可商用)

中文项目

  • 首选: bge-large-zh-v1.5
  • 备选: text2vec-large-chinese

开源项目

  • 英文: bge-large-en-v1.5
  • 多语言: m3e-base

成本敏感

  • 英文: gte-base-en-v1.5
  • 中文: uer-sbert-base-chinese

七、LangGraph中的RAG实现

LangGraph是一个基于LangChain的图状工作流框架,特别适合实现复杂的RAG应用。它通过定义节点和边来构建有向图,实现灵活的检索增强生成流程。

7.1 LangGraph RAG实现概述

7.1.1 什么是LangGraph?

LangGraph是LangChain生态系统中的一个组件,用于构建状态机驱动的应用程序。它将复杂的AI工作流表示为有向图,其中:

  • 节点(Nodes): 执行特定任务的函数
  • 边(Edges): 定义节点之间的连接和流转逻辑
  • 状态(State): 在节点间传递的数据
7.1.2 LangGraph RAG的优势
  1. 可视化工作流: 清晰展示RAG的各个阶段
  2. 灵活控制: 可以根据条件动态调整执行路径
  3. 错误处理: 内置的错误处理和重试机制
  4. 可扩展性: 易于添加新的处理步骤
  5. 状态管理: 自动管理中间状态和上下文

7.2 LangGraph核心概念

7.2.1 基本组件
from typing import TypedDict, Annotated, Sequence
import operator
from langgraph.graph import Graph, StateGraph
from langchain_core.messages import BaseMessage

# 定义状态
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    query: str
    context: str
    answer: str
    next_action: str
7.2.2 节点类型
  1. 检索节点: 负责从向量库中检索相关文档
  2. 生成节点: 基于检索结果生成回答
  3. 评估节点: 评估回答质量
  4. 路由节点: 决定下一步执行路径

7.3 基础RAG工作流实现

7.3.1 基础RAG图结构
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnablePassthrough

class RAGWorkflow:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        self.workflow = None
    
    def setup_workflow(self):
        """设置RAG工作流"""
        workflow = StateGraph(AgentState)
        
        # 添加节点
        workflow.add_node("retrieve", self.retrieve_documents)
        workflow.add_node("generate", self.generate_answer)
        workflow.add_node("evaluate", self.evaluate_answer)
        
        # 设置入口点
        workflow.set_entry_point("retrieve")
        
        # 添加边
        workflow.add_edge("retrieve", "generate")
        workflow.add_edge("generate", "evaluate")
        
        # 条件边:根据评估结果决定是否结束或重试
        workflow.add_conditional_edges(
            "evaluate",
            self.should_continue,
            {
                "end": END,
                "retry": "generate"
            }
        )
        
        self.workflow = workflow.compile()
        return self.workflow
7.3.2 节点实现
def retrieve_documents(self, state: AgentState) -> AgentState:
    """检索相关文档"""
    query = state["query"]
    
    # 相似度检索
    docs = self.vectorstore.similarity_search(query, k=3)
    
    # 提取文档内容
    context = "\n\n".join([doc.page_content for doc in docs])
    
    return {
        **state,
        "context": context,
        "next_action": "generate"
    }

def generate_answer(self, state: AgentState) -> AgentState:
    """生成回答"""
    query = state["query"]
    context = state["context"]
    
    # 构建提示
    prompt = f"""基于以下信息回答问题:
    
背景信息:{context}

问题:{query}

请给出准确、详细的回答:"""
    
    # 生成回答
    response = self.llm.invoke(prompt)
    
    return {
        **state,
        "answer": response.content,
        "next_action": "evaluate"
    }

def evaluate_answer(self, state: AgentState) -> AgentState:
    """评估回答质量"""
    query = state["query"]
    context = state["context"]
    answer = state["answer"]
    
    # 评估提示
    evaluation_prompt = f"""评估以下回答的质量:

问题:{query}
背景信息:{context}
回答:{answer}

请评估:
1. 回答是否准确回答了问题
2. 回答是否基于提供的背景信息
3. 回答是否完整且详细

如果回答质量良好,返回"good",否则返回"poor":"""
    
    evaluation = self.llm.invoke(evaluation_prompt)
    
    return {
        **state,
        "evaluation": evaluation.content,
        "next_action": "end" if "good" in evaluation.content.lower() else "retry"
    }

def should_continue(self, state: AgentState) -> str:
    """决定是否继续"""
    return state["next_action"]

7.4 高级RAG实现

7.4.1 多轮检索RAG
class MultiRoundRAGWorkflow:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        self.max_rounds = 3
    
    def setup_workflow(self):
        workflow = StateGraph(AgentState)
        
        # 添加节点
        workflow.add_node("query_analysis", self.analyze_query)
        workflow.add_node("retrieve", self.retrieve_documents)
        workflow.add_node("assess_relevance", self.assess_relevance)
        workflow.add_node("refine_query", self.refine_query)
        workflow.add_node("generate", self.generate_answer)
        workflow.add_node("evaluate", self.evaluate_answer)
        
        # 设置入口点
        workflow.set_entry_point("query_analysis")
        
        # 添加边
        workflow.add_edge("query_analysis", "retrieve")
        workflow.add_edge("retrieve", "assess_relevance")
        workflow.add_conditional_edges(
            "assess_relevance",
            self.needs_refinement,
            {
                "refine": "refine_query",
                "generate": "generate"
            }
        )
        workflow.add_edge("refine_query", "retrieve")
        workflow.add_edge("generate", "evaluate")
        workflow.add_conditional_edges(
            "evaluate",
            self.should_continue,
            {
                "end": END,
                "retry": "retrieve"
            }
        )
        
        return workflow.compile()
7.4.2 自问自答RAG(Self-Ask RAG)
class SelfAskRAGWorkflow:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
    
    def setup_workflow(self):
        workflow = StateGraph(AgentState)
        
        # 添加节点
        workflow.add_node("decompose_question", self.decompose_question)
        workflow.add_node("answer_subquestion", self.answer_subquestion)
        workflow.add_node("synthesize_answer", self.synthesize_answer)
        
        # 设置入口点
        workflow.set_entry_point("decompose_question")
        
        # 添加边
        workflow.add_edge("decompose_question", "answer_subquestion")
        workflow.add_conditional_edges(
            "answer_subquestion",
            self.has_more_subquestions,
            {
                "continue": "answer_subquestion",
                "synthesize": "synthesize_answer"
            }
        )
        workflow.add_edge("synthesize_answer", END)
        
        return workflow.compile()

7.5 完整代码示例

# 完整的LangGraph RAG实现
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader

class CompleteRAGSystem:
    def __init__(self, documents_path, openai_api_key):
        self.openai_api_key = openai_api_key
        self.setup_components(documents_path)
        self.setup_workflow()
    
    def setup_components(self, documents_path):
        """设置RAG组件"""
        # 加载文档
        loader = TextLoader(documents_path)
        documents = loader.load()
        
        # 文档分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        chunks = text_splitter.split_documents(documents)
        
        # 向量化
        embeddings = OpenAIEmbeddings(openai_api_key=self.openai_api_key)
        self.vectorstore = Chroma.from_documents(chunks, embeddings)
        
        # 语言模型
        self.llm = ChatOpenAI(
            temperature=0,
            openai_api_key=self.openai_api_key
        )
    
    def setup_workflow(self):
        """设置工作流"""
        self.rag_workflow = MultiRoundRAGWorkflow(
            self.vectorstore, self.llm
        )
        self.workflow = self.rag_workflow.setup_workflow()
    
    def query(self, question):
        """执行查询"""
        initial_state = {
            "query": question,
            "context": "",
            "answer": "",
            "next_action": ""
        }
        
        result = self.workflow.invoke(initial_state)
        return result["answer"]

7.6 LangGraph RAG最佳实践

7.6.1 性能优化
  1. 缓存机制: 缓存中间结果避免重复计算
  2. 并行处理: 对独立子任务使用并行执行
  3. 结果缓存: 缓存常见查询的答案
7.6.2 错误处理
def safe_invoke_node(func):
    """安全执行节点的装饰器"""
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            print(f"节点执行错误: {e}")
            # 返回安全状态或重试逻辑
            return {"error": str(e), "next_action": "retry"}
    return wrapper
7.6.3 监控和调试
class MonitoringRAGWorkflow:
    def __init__(self):
        self.execution_log = []
    
    def log_execution(self, node_name, input_state, output_state):
        """记录执行日志"""
        self.execution_log.append({
            "node": node_name,
            "input": input_state,
            "output": output_state,
            "timestamp": datetime.now()
        })

LangGraph为RAG应用提供了强大的图状工作流支持,使得复杂的检索增强生成逻辑变得更加清晰、可控和可扩展。通过合理设计节点和流转逻辑,可以构建出高效、可靠的RAG系统。


总结

RAG技术通过将检索和生成相结合,有效解决了大语言模型的局限性,提供了更准确、可靠的信息服务。其工作流程清晰,实现相对简单,适用于多种应用场景。随着技术的不断发展,RAG将在更多领域发挥重要作用,成为构建智能问答系统的核心技术之一。

关键要点

  • RAG = 检索 + 增强 + 生成
  • 工作流程包括文档处理和查询处理两个阶段
  • 技术栈成熟,有多个开源框架支持
  • 适用于需要准确、最新信息的场景
  • 具备良好的可扩展性和可控性
  • 文档分割: 选择合适的分割策略对RAG性能至关重要
  • 嵌入模型选择: 根据具体需求选择合适的嵌入模型,平衡性能、成本和部署便利性
  • LangGraph: 提供强大的图状工作流支持,实现复杂的RAG应用逻辑
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐