一、RAG初识:

RAG(Retrieval-Augmented Generation,检索增强生成)是一种将 信息检索与文本生成 相结合的技术框架。它通过以下流程解决大模型(LLM)的“知识盲区”问题:

用户问题->从知识库检索相关文档->将文档作为上下文输入LLM->生成精准答案

二、核心流程:

1.用户提问:XXX公司XXXX年销售总额是多少?

首先拆分关键词、理解句意

2.知识库检索:使用向量数据库,检索相似文档,再返回相关片段

3.提示词工程:

基于以下信息回答问题:

<检索到的文档1>

<检索到的文档2>

...

问题:{用户的问题}

答案:

4.生成答案

三、RAG关键组件:

1.文本加载器:用于解析PDF/Word等原始文件

2.嵌入模型:用户将文本转化为向量

3.向量数据库:用于储存或快速检索向量

4.大语言模型:用于生成答案

5.提示词管理器:动态构建上下文提示

demo1(调用大模型):

1.创建项目-2.添加notebook:uv add notebook-3.安装依赖,安装OpenAI的依赖:uv add openai

4.初始化客户端:

load_dotenv()
API_KEY=os.getenv("DEEPSEEK_API_KEY")
#创建OpenAI客户端
client=OpenAI(
    api_key=API_KEY,#api_key,从环境变量中获取api_key
    base_url="https://api.deepseek.com"#路径
)
#补充api_key不能直接写死,不能是硬编码,应该从环境变量中获取
#读取环境变量需要用到python-dotenv(做环境管理的一个包),需要先安装依赖:uv add python-dotenv
#再创建.env文件,并写入API_KEY:API_KEY=
#再从环境变量中获取api_key
#补充api_key不能直接写死,不能是硬编码,应该从环境变量中获取
#读取环境变量需要用到python-dotenv(做环境管理的一个包),需要先安装依赖:uv add python-dotenv
#再创建.env文件,并写入API_KEY:API_KEY=
#再从环境变量中获取api_key

5.访问大模型

print("...正在调用大模型...")
response=client.chat.completions.create(
    model="deepseek-chat",#模型类型
    messages=[
        {"role":"system","content":"你是ai"},#system系统设定
        {"role":"user","content":"你是谁"},#user用户回答
    ],
    stream=False,#阻塞式
    temperature=0.9#自由度
)
print("...成功调用大模型...")

demo2(向量化):

选用的m3e模型,该模型主要针对中文文本进行向量化处理

#首先得到一个文档,将文档存入faiss这个库中
#数据
documents=[

]

#加载模型(先检查本地是否存在)
local_model_path= '../local_m3e_model'
if os.path.exists(local_model_path):
    print(f"从本地加载模型:{local_model_path}")
    model=SentenceTransformer(local_model_path)
else:
    print(f"本地模型不存在,从网络加载:moka-ai/m3e_model")
    model=SentenceTransformer('moka-ai/m3e_base')
    print(f"保存模型到本地:{local_model_path}")
    model.save(local_model_path)

#打印加载成功
print("模型加载成功!\n")

#将文档转换为向量
embeddings=model.encode(documents,convert_to_numpy=True)

#使用FAISS创建索引
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(embeddings)

#查询最相似的都向量
query="界面新闻"
query_embedding=model.encode(query,convert_to_numpy=True)
D,I=index.search(query_embedding,k=2) #k=2:返回两个距离最近的

print(D)
print(I)

该小demo目的是为了后面将文档进行向量化,以便做检索

demo3(文档分块):

文档分块策略:
将长文档切割为小块(chunks)以提高检索精度和效果
策略:
1.固定长度分块
2.按语义分块(如句子、段落)
3.确保块之间有一定重
def chunk_document(text,max_chars=500,overlap=100):
    '''
    将长文档切分为交小的块
    参数:
    :param text:要切分的文本
    :param max_chars: 每个块的最大字符数
    :param overlap: 相邻块之间的重叠字符数
    :return chunk: 切分后的文本块列表
    '''
    #如果文本长度小于最大长度,直接返回
    if len(text)<=max_chars:
        return [text]

    chunks=[]
    start=0

    while start<len(text):
        #确定当前块的结束位置
        end=start+max_chars

        #如果没有达到文本末尾,尝试在句子边界切分
        if end<len(text):
            #在结束位置查找最近的句子结束标记
            sentence_ends=[
                m.end() for m in re.finditer(r'[。!?.!?]\s*',text[start:end])
            ]

            if sentence_ends:#如果找到句子结束标记,在最后一个句子结束处切分
                end=start+sentence_ends[-1]
            else:#如果没有找到,尝试在单词或标点处切分
                last_space=text[start:end].rfind(' ')
                last_punct=max(text[start:end].rfind(' '),text[end:last_space].rfind(' '))
                cut_point=max(last_space,last_punct)

                if cut_point>0:#如果找到了合适的切分点
                    end=start+cut_point+1

        #添加当前块到结果列表
        chunks.append(text[start:end])

        #移动开始位置,考虑重叠
        start=end-overlap

        #确保开始位置不会后退
        if start<0:
            start=0

        #避免无限循环
        if start<len(text):
            break

    return chunks

该demo用于处理长文档,并且保证文档内容的连贯性以及完整性

demo4(处理不同格式的文档):

# 尝试导入不可能存在的包
try:
    import docx  # 处理word文档的一个库
    DOCX_AVAILABLE = True
except ImportError:
    DOCX_AVAILABLE = False
    print("警告:python-docx未安装,无法处理.docx文件")

try:
    import PyPDF2
    PyPDF2_AVAILABLE = True
except ImportError:
    PyPDF2_AVAILABLE = False
    print("警告:PyPDF2未安装,无法处理.pdf文件")


def clean_text(text):
    '''
    清理文本,清除多余的空格和换行
    '''
    if not text:
        return ""
    # 替换多个换行为空格
    text = re.sub(r'\n+', ' ', text)
    # 替换多个空格/制表符为单个空格
    text = re.sub(r'\s+', ' ', text)
    # 去除首尾空白
    return text.strip()


def load_text_file(file_path):
    '''加载文本文件(.txt, .md, .csv等)'''
    encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
    for enc in encodings:
        try:
            with open(file_path, 'r', encoding=enc) as f:
                content = f.read()
            return clean_text(content), None
        except UnicodeDecodeError:
            continue
        except Exception as e:
            return None, f"无法读取文本文件 {file_path}: {str(e)}"
    return None, f"所有编码均无法读取 {file_path}"


def load_pdf_file(file_path):
    '''加载PDF文件'''
    if not PyPDF2_AVAILABLE:
        return None, "PyPDF2包未安装,无法处理.pdf文件"

    try:
        content = ''
        with open(file_path, 'rb') as f:
            # 兼容新版 PyPDF2
            pdf_reader = PyPDF2.PdfReader(f)
            for page in pdf_reader.pages:
                page_content = page.extract_text()
                if page_content:
                    content += page_content + '\n'

        if not content.strip():
            return None, f"PDF文件 {file_path} 未能提取到文本内容"

        return clean_text(content), None
    except Exception as e:
        return None, f"读取PDF文件 {file_path} 时出错: {str(e)}"


def load_docx_file(file_path):
    '''加载Word文档 (.docx)'''
    if not DOCX_AVAILABLE:
        return None, "python-docx未安装,无法处理.docx文件"

    try:
        doc = docx.Document(file_path)
        content = '\n'.join([para.text for para in doc.paragraphs])
        if not content.strip():
            return None, f"DOCX文件 {file_path} 无文本内容"
        return clean_text(content), None
    except Exception as e:
        return None, f"读取DOCX文件 {file_path} 时出错: {str(e)}"


def load_markdown_file(file_path):
    '''加载markdown文件'''
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            md_content = f.read()

        # 将markdown转换为HTML,然后提取纯文本
        html = markdown.markdown(md_content)
        soup = BeautifulSoup(html, 'html.parser')
        content = soup.get_text()

        return clean_text(content), None
    except Exception as e:
        return None, f"读取markdown文件 {file_path} 时出错: {str(e)}"


def load_excel_file(file_path):
    '''加载excel文件 (.xlsx, .xls)'''
    try:
        # 读取所有sheet并合并
        excel_file = pd.ExcelFile(file_path)
        all_content = []

        for sheet_name in excel_file.sheet_names:
            df = pd.read_excel(file_path, sheet_name=sheet_name)
            # 去掉全空的行/列
            df = df.dropna(how='all', axis=0).dropna(how='all', axis=1)
            if df.empty:
                continue

            sheet_text = f"【Sheet: {sheet_name}】\n"
            sheet_text += df.to_string(index=False, na_rep="")
            all_content.append(sheet_text)

        if not all_content:
            return None, "Excel文件内容为空"

        content = "\n\n".join(all_content)
        return clean_text(content), None

    except Exception as e:
        return None, f"读取Excel文件 {file_path} 时出错: {str(e)}"


def load_documents_from_directory(directory_path, file_types=None):
    '''
    从指定目录加载多种类型的文档
    :param directory_path: 文档所在目录路径
    :param file_types: 文件类型列表,如['txt','pdf','docx','md','xlsx']
                       如果为None,则加载所有支持的类型
    :return: documents 文档内容列表
             doc_sources 文档来源路径列表
             errors 错误信息列表
    '''
    # 支持的文件后缀(自动识别)
    SUPPORTED_EXT = {
        'txt': ['.txt'],
        'pdf': ['.pdf'],
        'docx': ['.docx'],
        'md': ['.md', '.markdown'],
        'excel': ['.xlsx', '.xls']
    }

    if file_types is None:
        file_types = list(SUPPORTED_EXT.keys())

    documents = []
    doc_sources = []
    errors = []

    if not os.path.isdir(directory_path):
        errors.append(f"目录不存在:{directory_path}")
        return documents, doc_sources, errors

    # 遍历所有文件
    all_files = glob.glob(os.path.join(directory_path, '*'))
    for file_path in all_files:
        if not os.path.isfile(file_path):
            continue

        content, error = None, None
        ext = os.path.splitext(file_path)[1].lower()

        # 根据后缀分发处理函数
        if ext == '.txt':
            content, error = load_text_file(file_path)
        elif ext == '.pdf':
            content, error = load_pdf_file(file_path)
        elif ext == '.docx':
            content, error = load_docx_file(file_path)
        elif ext in ['.md', '.markdown']:
            content, error = load_markdown_file(file_path)
        elif ext in ['.xlsx', '.xls']:
            content, error = load_excel_file(file_path)
        else:
            continue  # 不支持的格式跳过

        # 处理结果
        if content:
            documents.append(content)
            doc_sources.append(file_path)
            print(f" 成功加载:{os.path.basename(file_path)}")
        if error:
            errors.append(error)
            print(f" {error}")

    if not documents:
        errors.append(f"在目录中未找到可处理的文档:{directory_path}")

    return documents, doc_sources, errors

该demo用于处理不同格式的文档

project:

将以上4个demo整合优化后,以下就是一个简单的rag:

import os
import re

import faiss
import numpy as np
from openai import OpenAI
from sentence_transformers import SentenceTransformer
from sympy.printing import preview

from file_utils import load_documents_from_directory

#加载文档
documents,sources,errors=load_documents_from_directory("./docs")#load_documents_from_directory是从本地

#显示结果
print(f"\n成功加载{len(documents)}个文档:")
for i,(doc,source) in enumerate(zip(documents,sources)):
    print(f"\n文档{i+1}来源:{source}")
    #只显示文档的开头部分
    preview=doc[:200]+"..."if len(doc)>200 else doc
    print(f"内容预览{preview}")

#如果有错误,显示错误信息
if errors:
    print("\n加载过程中出现以下错误:")
    for error in errors:
        print(f"- {error}")

#调用m3e模型,将文档转化为向量数组,方便存储和调用
def get_embeddings(text):
    embeddings=model.encode(text,normalize_embeddings=True)
    return np.array(embeddings)

#加载m3e模型
local_model_path='local_m3e_model'
if os.path.exists(local_model_path):
    print(f"从本地加载模型:{local_model_path}")
    model=SentenceTransformer(local_model_path)
else:
    print(f"本地模型不存在,从网络加载:moka-ai/m3e_base")
    model=SentenceTransformer('moka-ai/m3e_base')
    #保存到本地,以便下次使用
    print(f"保存模型到本地:{local_model_path}")
    model.save(local_model_path)

#打印加载成功
print("模型加载成功!\n")

#文档分块
def chunk_document(text,max_chars=500,overlap=100):
    '''
    将长文档切分为交小的块
    参数:
    :param text:要切分的文本
    :param max_chars: 每个块的最大字符数
    :param overlap: 相邻块之间的重叠字符数
    :return chunk: 切分后的文本块列表
    '''
    #如果文本长度小于最大长度,直接返回
    if len(text)<=max_chars:
        return [text]

    chunks=[]
    start=0

    while start<len(text):
        #确定当前块的结束位置
        end=start+max_chars

        #如果没有达到文本末尾,尝试在句子边界切分
        if end<len(text):
            #在结束位置查找最近的句子结束标记
            sentence_ends=[
                m.end() for m in re.finditer(r'[。!?.!?]\s*',text[start:end])
            ]

            if sentence_ends:#如果找到句子结束标记,在最后一个句子结束处切分
                end=start+sentence_ends[-1]
            else:#如果没有找到,尝试在单词或标点处切分
                last_space=text[start:end].rfind(' ')
                last_punct=max(text[start:end].rfind(' '),text[end:last_space].rfind(' '))
                cut_point=max(last_space,last_punct)

                if cut_point>0:#如果找到了合适的切分点
                    end=start+cut_point+1

        #添加当前块到结果列表
        chunks.append(text[start:end])

        #移动开始位置,考虑重叠
        start=end-overlap

        #确保开始位置不会后退
        if start<0:
            start=0

        #避免无限循环
        if start<len(text):
            break

    return chunks

#文档和chunk的映射关系
document_to_chunks={}
chunks_to_docunment={}
all_chunks=[]

#索引文件路径
index_file_path="m3e_faiss_index.bin"
chunks_map_path="chunks_mapping.npy"

#判断是否已存在索引文件
if os.path.exists(index_file_path) and os.path.exists(chunks_map_path):
    print(f"从本地加载索引和映射:{index_file_path,chunks_map_path}")
    index=faiss.read_index(index_file_path)
    #加载映射关系
    mapping_data=np.load(chunks_map_path,allow_pickle=True).item()
    document_to_chunks=mapping_data["doc_to_chunks"]
    chunks_to_docunment=mapping_data["chunks_to_doc"]
    all_chunks=mapping_data["all_chunks"]
else:
    print("本地索引不存在,创建新索引")
    #处理文档并分块
    for doc_id,doc in enumerate(documents):#doc_id文档编号,doc文档内容,对内存中的文档进行遍历
        #对长文档进行分块
        chunks=chunk_document(doc)#调用分片函数,将文档切开

        #存储映射关系
        document_to_chunks[doc_id]=[]
        for chunk in chunks:
            chunk_id=len(all_chunks)
            all_chunks.append(chunk)
            document_to_chunks[doc_id].append(chunk_id)#文档和区域块的对应关系
            chunks_to_docunment[chunk_id]=doc_id#区域块和文档的对应关系

    #生成文档块嵌入
    chunk_embeddings=get_embeddings(all_chunks)#将所有的文档块进行向量化

    #初始化FAISS索引
    dimension=chunk_embeddings.shape[1]
    index=faiss.IndexFlatL2(dimension)
    index.add(chunk_embeddings)#所有的文档块存到索引中

    #保存索引
    faiss.write_index(index,index_file_path)

    #保存映射关系,用字典保存
    mapping_data={
        'doc_to_chunks':document_to_chunks,#文档->块
        'chunks_to_doc':chunks_to_docunment,#块->文档
        'all_chunks':all_chunks,
    }
    np.save(chunks_map_path,mapping_data)#将字典也保存下来

    print(f"索引创建并保存成功:{index_file_path}\n")

#检索函数
def retrieve_docs(query,index,k=5):
    '''
    检索最相关的文档
    :param query:查询文本
    :param index: FAISS索引
    :param k: 返回的相关chunk数量
    :return: 按相关性排序的原始文档列表
    '''
    query_embedding=get_embeddings([query])
    distances,chunk_indices=index.search(query_embedding,k=k)

    #获取包含这些chunks的原始文档ID
    retrieved_doc_ids=set()
    retrieved_chunks=[]

    for chunk_idx in chunk_indices[0]:
        if chunk_idx >=0 and chunk_idx<len(all_chunks):#确保索引有效
            doc_id=chunks_to_docunment.get(int(chunk_idx))
            if doc_id is not None:
                retrieved_doc_ids.add(doc_id)
                retrieved_chunks.append((doc_id,all_chunks[int(chunk_idx)]))

    #获取原始文档
    retrieved_docs=[documents[doc_id] for doc_id in retrieved_doc_ids]

    #返回文档和对应的相关块
    return retrieved_docs,retrieved_chunks

#deepseek

client=OpenAI(
    api_key="sk-e0274bea43e445179235ddfbb87e5ada",
    base_url="https://api.deepseek.com")

def generate_answer(query,retrieved_docs,retrieved_chunks):
    '''
    基于检索到的文档生成回答
    :param query: 用户查询
    :param retrieved_docs:检索到的完整文档
    :param retrieved_chunks: 检索到的文档块
    :return: 生成回答
    '''
    #构建上下文,包含原始文档和相关块
    context="原始文档:\n"+"\n".join(retrieved_docs)

    #添加相关块信息
    context+="\n\n相关文本块:\n"
    for doc_id,chunk in retrieved_chunks:
        context+=f"[文档{doc_id}]{chunk}\n"

    prompt=f"上下文信息:\n{context}\n\n问题:{query}\n请基于上下文信息回答问题:"

    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "system", "content": "你是一个专业的问答助手。请仅基于提供的上下文信息回答问题。"},
            {"role": "user", "content": prompt},
        ],
        stream=False,
    )
    return response.choices[0].message.content

def main():
    #测试
    query=""
    print("Query:",query)
    retrieved_docs,retrieved_chunks=retrieve_docs(query,index)

    #打印检索到的文档
    print("检索到的文档:")
    for doc in retrieved_docs:
        print(doc)

    #打印检索到的文本块
    print("\n检索到的文本块:")
    for doc_id,chunk in retrieved_chunks:
        print(f"[文档{doc_id}]{chunk}")

    answer=generate_answer(query,retrieved_docs,retrieved_chunks)
    print("\nAnswer:",answer)

if __name__=="__main__":
    main()

思考:

但是在实际落地场景中,往往会存在检索准确率低,噪音干扰多,召回完整性,专业性不够,导致LLM幻觉严重的问题。

知识库文档越来越多以后,检索噪音大,召回准确率不高
召回不全,完整性不够
召回和用户问题意图相关性不大
只能回答静态数据,无法动态获取知识,导致答疑应用比较呆,比较笨。

优化思路:

1.知识加载:

目的:对文档进行精确的解析

优化建议:

(1)将docx、txt优先处理为pdf或markdown格式。为什么?因为这两种格式的结构信息(标题、段落、表格)更规范,工具更容易识别,能减少文本提取错误。

(2)提取文本中的表格信息。表格是结构化数据,直接当成文本处理会丢失行列关系、数字含义。优化方案是把表格转成 Markdown 格式,甚至提取成键值对

(3)Markdown 和 PDF 自带标题层级(比如 H1、H2、H3),处理时要保留下来。后续可以基于标题层级做 “层级索引”,比如用户问 “XXX 公司XXXX年的销售总额”,可以先定位到 “XXXX年” 这个二级标题下的所有内容,大幅缩小检索范围,减少噪音。

(4)把图片链接、公式也转成 Markdown 格式,避免这些信息被丢弃。

2.Chunk尽量保持完整:

(Chunk 就是你把长文档切分成的小块文本,是 RAG 里用来做向量检索的最小单位。)

(1)图片 + 表格单独抽取成 Chunk。把文档里的图片、表格,不跟正文混在一起切分,而是单独抽出来作为独立的 Chunk,同时把图片 / 表格的标题、说明存到 Chunk 的 metadata(元数据)里。

(2)按标题层级 / Markdown Header 拆分,保留Chunk的完整性

(3)如果文档里有固定的自定义分隔符(比如 ---###、章节分隔线等),可以基于这些分隔符来切分。

3.多元化信息抽取

4.知识处理工作流

5.静态知识RAG优化

6.动态知识RAG优化

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐