3.RAG
一、RAG初识:
RAG(Retrieval-Augmented Generation,检索增强生成)是一种将 信息检索与文本生成 相结合的技术框架。它通过以下流程解决大模型(LLM)的“知识盲区”问题:
用户问题->从知识库检索相关文档->将文档作为上下文输入LLM->生成精准答案
二、核心流程:
1.用户提问:XXX公司XXXX年销售总额是多少?
首先拆分关键词、理解句意
2.知识库检索:使用向量数据库,检索相似文档,再返回相关片段
3.提示词工程:
基于以下信息回答问题:
<检索到的文档1>
<检索到的文档2>
...
问题:{用户的问题}
答案:
4.生成答案
三、RAG关键组件:
1.文本加载器:用于解析PDF/Word等原始文件
2.嵌入模型:用户将文本转化为向量
3.向量数据库:用于储存或快速检索向量
4.大语言模型:用于生成答案
5.提示词管理器:动态构建上下文提示
demo1(调用大模型):
1.创建项目-2.添加notebook:uv add notebook-3.安装依赖,安装OpenAI的依赖:uv add openai
4.初始化客户端:
load_dotenv()
API_KEY=os.getenv("DEEPSEEK_API_KEY")
#创建OpenAI客户端
client=OpenAI(
api_key=API_KEY,#api_key,从环境变量中获取api_key
base_url="https://api.deepseek.com"#路径
)
#补充api_key不能直接写死,不能是硬编码,应该从环境变量中获取
#读取环境变量需要用到python-dotenv(做环境管理的一个包),需要先安装依赖:uv add python-dotenv
#再创建.env文件,并写入API_KEY:API_KEY=
#再从环境变量中获取api_key
#补充api_key不能直接写死,不能是硬编码,应该从环境变量中获取 #读取环境变量需要用到python-dotenv(做环境管理的一个包),需要先安装依赖:uv add python-dotenv #再创建.env文件,并写入API_KEY:API_KEY= #再从环境变量中获取api_key
5.访问大模型
print("...正在调用大模型...")
response=client.chat.completions.create(
model="deepseek-chat",#模型类型
messages=[
{"role":"system","content":"你是ai"},#system系统设定
{"role":"user","content":"你是谁"},#user用户回答
],
stream=False,#阻塞式
temperature=0.9#自由度
)
print("...成功调用大模型...")
demo2(向量化):
选用的m3e模型,该模型主要针对中文文本进行向量化处理
#首先得到一个文档,将文档存入faiss这个库中
#数据
documents=[
]
#加载模型(先检查本地是否存在)
local_model_path= '../local_m3e_model'
if os.path.exists(local_model_path):
print(f"从本地加载模型:{local_model_path}")
model=SentenceTransformer(local_model_path)
else:
print(f"本地模型不存在,从网络加载:moka-ai/m3e_model")
model=SentenceTransformer('moka-ai/m3e_base')
print(f"保存模型到本地:{local_model_path}")
model.save(local_model_path)
#打印加载成功
print("模型加载成功!\n")
#将文档转换为向量
embeddings=model.encode(documents,convert_to_numpy=True)
#使用FAISS创建索引
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(embeddings)
#查询最相似的都向量
query="界面新闻"
query_embedding=model.encode(query,convert_to_numpy=True)
D,I=index.search(query_embedding,k=2) #k=2:返回两个距离最近的
print(D)
print(I)
该小demo目的是为了后面将文档进行向量化,以便做检索
demo3(文档分块):
文档分块策略: 将长文档切割为小块(chunks)以提高检索精度和效果 策略: 1.固定长度分块 2.按语义分块(如句子、段落) 3.确保块之间有一定重
def chunk_document(text,max_chars=500,overlap=100):
'''
将长文档切分为交小的块
参数:
:param text:要切分的文本
:param max_chars: 每个块的最大字符数
:param overlap: 相邻块之间的重叠字符数
:return chunk: 切分后的文本块列表
'''
#如果文本长度小于最大长度,直接返回
if len(text)<=max_chars:
return [text]
chunks=[]
start=0
while start<len(text):
#确定当前块的结束位置
end=start+max_chars
#如果没有达到文本末尾,尝试在句子边界切分
if end<len(text):
#在结束位置查找最近的句子结束标记
sentence_ends=[
m.end() for m in re.finditer(r'[。!?.!?]\s*',text[start:end])
]
if sentence_ends:#如果找到句子结束标记,在最后一个句子结束处切分
end=start+sentence_ends[-1]
else:#如果没有找到,尝试在单词或标点处切分
last_space=text[start:end].rfind(' ')
last_punct=max(text[start:end].rfind(' '),text[end:last_space].rfind(' '))
cut_point=max(last_space,last_punct)
if cut_point>0:#如果找到了合适的切分点
end=start+cut_point+1
#添加当前块到结果列表
chunks.append(text[start:end])
#移动开始位置,考虑重叠
start=end-overlap
#确保开始位置不会后退
if start<0:
start=0
#避免无限循环
if start<len(text):
break
return chunks
该demo用于处理长文档,并且保证文档内容的连贯性以及完整性
demo4(处理不同格式的文档):
# 尝试导入不可能存在的包
try:
import docx # 处理word文档的一个库
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
print("警告:python-docx未安装,无法处理.docx文件")
try:
import PyPDF2
PyPDF2_AVAILABLE = True
except ImportError:
PyPDF2_AVAILABLE = False
print("警告:PyPDF2未安装,无法处理.pdf文件")
def clean_text(text):
'''
清理文本,清除多余的空格和换行
'''
if not text:
return ""
# 替换多个换行为空格
text = re.sub(r'\n+', ' ', text)
# 替换多个空格/制表符为单个空格
text = re.sub(r'\s+', ' ', text)
# 去除首尾空白
return text.strip()
def load_text_file(file_path):
'''加载文本文件(.txt, .md, .csv等)'''
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
for enc in encodings:
try:
with open(file_path, 'r', encoding=enc) as f:
content = f.read()
return clean_text(content), None
except UnicodeDecodeError:
continue
except Exception as e:
return None, f"无法读取文本文件 {file_path}: {str(e)}"
return None, f"所有编码均无法读取 {file_path}"
def load_pdf_file(file_path):
'''加载PDF文件'''
if not PyPDF2_AVAILABLE:
return None, "PyPDF2包未安装,无法处理.pdf文件"
try:
content = ''
with open(file_path, 'rb') as f:
# 兼容新版 PyPDF2
pdf_reader = PyPDF2.PdfReader(f)
for page in pdf_reader.pages:
page_content = page.extract_text()
if page_content:
content += page_content + '\n'
if not content.strip():
return None, f"PDF文件 {file_path} 未能提取到文本内容"
return clean_text(content), None
except Exception as e:
return None, f"读取PDF文件 {file_path} 时出错: {str(e)}"
def load_docx_file(file_path):
'''加载Word文档 (.docx)'''
if not DOCX_AVAILABLE:
return None, "python-docx未安装,无法处理.docx文件"
try:
doc = docx.Document(file_path)
content = '\n'.join([para.text for para in doc.paragraphs])
if not content.strip():
return None, f"DOCX文件 {file_path} 无文本内容"
return clean_text(content), None
except Exception as e:
return None, f"读取DOCX文件 {file_path} 时出错: {str(e)}"
def load_markdown_file(file_path):
'''加载markdown文件'''
try:
with open(file_path, 'r', encoding='utf-8') as f:
md_content = f.read()
# 将markdown转换为HTML,然后提取纯文本
html = markdown.markdown(md_content)
soup = BeautifulSoup(html, 'html.parser')
content = soup.get_text()
return clean_text(content), None
except Exception as e:
return None, f"读取markdown文件 {file_path} 时出错: {str(e)}"
def load_excel_file(file_path):
'''加载excel文件 (.xlsx, .xls)'''
try:
# 读取所有sheet并合并
excel_file = pd.ExcelFile(file_path)
all_content = []
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 去掉全空的行/列
df = df.dropna(how='all', axis=0).dropna(how='all', axis=1)
if df.empty:
continue
sheet_text = f"【Sheet: {sheet_name}】\n"
sheet_text += df.to_string(index=False, na_rep="")
all_content.append(sheet_text)
if not all_content:
return None, "Excel文件内容为空"
content = "\n\n".join(all_content)
return clean_text(content), None
except Exception as e:
return None, f"读取Excel文件 {file_path} 时出错: {str(e)}"
def load_documents_from_directory(directory_path, file_types=None):
'''
从指定目录加载多种类型的文档
:param directory_path: 文档所在目录路径
:param file_types: 文件类型列表,如['txt','pdf','docx','md','xlsx']
如果为None,则加载所有支持的类型
:return: documents 文档内容列表
doc_sources 文档来源路径列表
errors 错误信息列表
'''
# 支持的文件后缀(自动识别)
SUPPORTED_EXT = {
'txt': ['.txt'],
'pdf': ['.pdf'],
'docx': ['.docx'],
'md': ['.md', '.markdown'],
'excel': ['.xlsx', '.xls']
}
if file_types is None:
file_types = list(SUPPORTED_EXT.keys())
documents = []
doc_sources = []
errors = []
if not os.path.isdir(directory_path):
errors.append(f"目录不存在:{directory_path}")
return documents, doc_sources, errors
# 遍历所有文件
all_files = glob.glob(os.path.join(directory_path, '*'))
for file_path in all_files:
if not os.path.isfile(file_path):
continue
content, error = None, None
ext = os.path.splitext(file_path)[1].lower()
# 根据后缀分发处理函数
if ext == '.txt':
content, error = load_text_file(file_path)
elif ext == '.pdf':
content, error = load_pdf_file(file_path)
elif ext == '.docx':
content, error = load_docx_file(file_path)
elif ext in ['.md', '.markdown']:
content, error = load_markdown_file(file_path)
elif ext in ['.xlsx', '.xls']:
content, error = load_excel_file(file_path)
else:
continue # 不支持的格式跳过
# 处理结果
if content:
documents.append(content)
doc_sources.append(file_path)
print(f" 成功加载:{os.path.basename(file_path)}")
if error:
errors.append(error)
print(f" {error}")
if not documents:
errors.append(f"在目录中未找到可处理的文档:{directory_path}")
return documents, doc_sources, errors
该demo用于处理不同格式的文档
project:
将以上4个demo整合优化后,以下就是一个简单的rag:
import os
import re
import faiss
import numpy as np
from openai import OpenAI
from sentence_transformers import SentenceTransformer
from sympy.printing import preview
from file_utils import load_documents_from_directory
#加载文档
documents,sources,errors=load_documents_from_directory("./docs")#load_documents_from_directory是从本地
#显示结果
print(f"\n成功加载{len(documents)}个文档:")
for i,(doc,source) in enumerate(zip(documents,sources)):
print(f"\n文档{i+1}来源:{source}")
#只显示文档的开头部分
preview=doc[:200]+"..."if len(doc)>200 else doc
print(f"内容预览{preview}")
#如果有错误,显示错误信息
if errors:
print("\n加载过程中出现以下错误:")
for error in errors:
print(f"- {error}")
#调用m3e模型,将文档转化为向量数组,方便存储和调用
def get_embeddings(text):
embeddings=model.encode(text,normalize_embeddings=True)
return np.array(embeddings)
#加载m3e模型
local_model_path='local_m3e_model'
if os.path.exists(local_model_path):
print(f"从本地加载模型:{local_model_path}")
model=SentenceTransformer(local_model_path)
else:
print(f"本地模型不存在,从网络加载:moka-ai/m3e_base")
model=SentenceTransformer('moka-ai/m3e_base')
#保存到本地,以便下次使用
print(f"保存模型到本地:{local_model_path}")
model.save(local_model_path)
#打印加载成功
print("模型加载成功!\n")
#文档分块
def chunk_document(text,max_chars=500,overlap=100):
'''
将长文档切分为交小的块
参数:
:param text:要切分的文本
:param max_chars: 每个块的最大字符数
:param overlap: 相邻块之间的重叠字符数
:return chunk: 切分后的文本块列表
'''
#如果文本长度小于最大长度,直接返回
if len(text)<=max_chars:
return [text]
chunks=[]
start=0
while start<len(text):
#确定当前块的结束位置
end=start+max_chars
#如果没有达到文本末尾,尝试在句子边界切分
if end<len(text):
#在结束位置查找最近的句子结束标记
sentence_ends=[
m.end() for m in re.finditer(r'[。!?.!?]\s*',text[start:end])
]
if sentence_ends:#如果找到句子结束标记,在最后一个句子结束处切分
end=start+sentence_ends[-1]
else:#如果没有找到,尝试在单词或标点处切分
last_space=text[start:end].rfind(' ')
last_punct=max(text[start:end].rfind(' '),text[end:last_space].rfind(' '))
cut_point=max(last_space,last_punct)
if cut_point>0:#如果找到了合适的切分点
end=start+cut_point+1
#添加当前块到结果列表
chunks.append(text[start:end])
#移动开始位置,考虑重叠
start=end-overlap
#确保开始位置不会后退
if start<0:
start=0
#避免无限循环
if start<len(text):
break
return chunks
#文档和chunk的映射关系
document_to_chunks={}
chunks_to_docunment={}
all_chunks=[]
#索引文件路径
index_file_path="m3e_faiss_index.bin"
chunks_map_path="chunks_mapping.npy"
#判断是否已存在索引文件
if os.path.exists(index_file_path) and os.path.exists(chunks_map_path):
print(f"从本地加载索引和映射:{index_file_path,chunks_map_path}")
index=faiss.read_index(index_file_path)
#加载映射关系
mapping_data=np.load(chunks_map_path,allow_pickle=True).item()
document_to_chunks=mapping_data["doc_to_chunks"]
chunks_to_docunment=mapping_data["chunks_to_doc"]
all_chunks=mapping_data["all_chunks"]
else:
print("本地索引不存在,创建新索引")
#处理文档并分块
for doc_id,doc in enumerate(documents):#doc_id文档编号,doc文档内容,对内存中的文档进行遍历
#对长文档进行分块
chunks=chunk_document(doc)#调用分片函数,将文档切开
#存储映射关系
document_to_chunks[doc_id]=[]
for chunk in chunks:
chunk_id=len(all_chunks)
all_chunks.append(chunk)
document_to_chunks[doc_id].append(chunk_id)#文档和区域块的对应关系
chunks_to_docunment[chunk_id]=doc_id#区域块和文档的对应关系
#生成文档块嵌入
chunk_embeddings=get_embeddings(all_chunks)#将所有的文档块进行向量化
#初始化FAISS索引
dimension=chunk_embeddings.shape[1]
index=faiss.IndexFlatL2(dimension)
index.add(chunk_embeddings)#所有的文档块存到索引中
#保存索引
faiss.write_index(index,index_file_path)
#保存映射关系,用字典保存
mapping_data={
'doc_to_chunks':document_to_chunks,#文档->块
'chunks_to_doc':chunks_to_docunment,#块->文档
'all_chunks':all_chunks,
}
np.save(chunks_map_path,mapping_data)#将字典也保存下来
print(f"索引创建并保存成功:{index_file_path}\n")
#检索函数
def retrieve_docs(query,index,k=5):
'''
检索最相关的文档
:param query:查询文本
:param index: FAISS索引
:param k: 返回的相关chunk数量
:return: 按相关性排序的原始文档列表
'''
query_embedding=get_embeddings([query])
distances,chunk_indices=index.search(query_embedding,k=k)
#获取包含这些chunks的原始文档ID
retrieved_doc_ids=set()
retrieved_chunks=[]
for chunk_idx in chunk_indices[0]:
if chunk_idx >=0 and chunk_idx<len(all_chunks):#确保索引有效
doc_id=chunks_to_docunment.get(int(chunk_idx))
if doc_id is not None:
retrieved_doc_ids.add(doc_id)
retrieved_chunks.append((doc_id,all_chunks[int(chunk_idx)]))
#获取原始文档
retrieved_docs=[documents[doc_id] for doc_id in retrieved_doc_ids]
#返回文档和对应的相关块
return retrieved_docs,retrieved_chunks
#deepseek
client=OpenAI(
api_key="sk-e0274bea43e445179235ddfbb87e5ada",
base_url="https://api.deepseek.com")
def generate_answer(query,retrieved_docs,retrieved_chunks):
'''
基于检索到的文档生成回答
:param query: 用户查询
:param retrieved_docs:检索到的完整文档
:param retrieved_chunks: 检索到的文档块
:return: 生成回答
'''
#构建上下文,包含原始文档和相关块
context="原始文档:\n"+"\n".join(retrieved_docs)
#添加相关块信息
context+="\n\n相关文本块:\n"
for doc_id,chunk in retrieved_chunks:
context+=f"[文档{doc_id}]{chunk}\n"
prompt=f"上下文信息:\n{context}\n\n问题:{query}\n请基于上下文信息回答问题:"
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个专业的问答助手。请仅基于提供的上下文信息回答问题。"},
{"role": "user", "content": prompt},
],
stream=False,
)
return response.choices[0].message.content
def main():
#测试
query=""
print("Query:",query)
retrieved_docs,retrieved_chunks=retrieve_docs(query,index)
#打印检索到的文档
print("检索到的文档:")
for doc in retrieved_docs:
print(doc)
#打印检索到的文本块
print("\n检索到的文本块:")
for doc_id,chunk in retrieved_chunks:
print(f"[文档{doc_id}]{chunk}")
answer=generate_answer(query,retrieved_docs,retrieved_chunks)
print("\nAnswer:",answer)
if __name__=="__main__":
main()
思考:
但是在实际落地场景中,往往会存在检索准确率低,噪音干扰多,召回完整性,专业性不够,导致LLM幻觉严重的问题。
知识库文档越来越多以后,检索噪音大,召回准确率不高
召回不全,完整性不够
召回和用户问题意图相关性不大
只能回答静态数据,无法动态获取知识,导致答疑应用比较呆,比较笨。
优化思路:
1.知识加载:
目的:对文档进行精确的解析
优化建议:
(1)将docx、txt优先处理为pdf或markdown格式。为什么?因为这两种格式的结构信息(标题、段落、表格)更规范,工具更容易识别,能减少文本提取错误。
(2)提取文本中的表格信息。表格是结构化数据,直接当成文本处理会丢失行列关系、数字含义。优化方案是把表格转成 Markdown 格式,甚至提取成键值对
(3)Markdown 和 PDF 自带标题层级(比如 H1、H2、H3),处理时要保留下来。后续可以基于标题层级做 “层级索引”,比如用户问 “XXX 公司XXXX年的销售总额”,可以先定位到 “XXXX年” 这个二级标题下的所有内容,大幅缩小检索范围,减少噪音。
(4)把图片链接、公式也转成 Markdown 格式,避免这些信息被丢弃。
2.Chunk尽量保持完整:
(Chunk 就是你把长文档切分成的小块文本,是 RAG 里用来做向量检索的最小单位。)
(1)图片 + 表格单独抽取成 Chunk。把文档里的图片、表格,不跟正文混在一起切分,而是单独抽出来作为独立的 Chunk,同时把图片 / 表格的标题、说明存到 Chunk 的 metadata(元数据)里。
(2)按标题层级 / Markdown Header 拆分,保留Chunk的完整性
(3)如果文档里有固定的自定义分隔符(比如 ---、###、章节分隔线等),可以基于这些分隔符来切分。
3.多元化信息抽取
4.知识处理工作流
5.静态知识RAG优化
6.动态知识RAG优化
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)