3.文档分段、切块策略
·
一、文档分块核心认知
先搞懂:「分段」和「切块」到底是什么?有什么区别?
| 概念 | 大白话解释 | 核心作用 |
|---|---|---|
| 文档分段 | 把 PDF 提取出来的超长文本,按天然的语义结构(换行、段落、标题、空行)拆分成一个个独立的「段落」 | 先把文本按人的阅读习惯拆成逻辑单元,避免把不相关的内容混在一起,为后续切块做准备 |
| 文档切块(Chunking) | 把拆分好的长段落,再按固定的长度规则拆分成更短的「文本块」,适配向量模型的输入长度限制 | 解决长文本无法输入向量模型的问题,同时保证每个文本块的语义完整性,是 RAG 系统的核心基础环节 |
为什么必须先「分段」再「切块」?
直接对长文本切块,会有一个核心问题:把两个完全不相关的段落切到同一个文本块里,导致向量模型生成的向量语义混乱,检索效果极差
# -*- coding: utf-8 -*-
"""
@Created on : 2026/6/4 16:05
@creator : er_nao
@File :day_87.py
@Description :文档分段、切块策略
"""
import pdfplumber
import json
# -------------------------- 请替换为您的PDF文件路径 --------------------------
pdf_file_path = "C:\\Users\\hp\\Desktop\\NLP学习数据\\arctle2.pdf" # 替换为您的PDF文件路径
output_chunk_path = "C:\\Users\\hp\\Desktop\\NLP学习数据\\文档分块结果.json" # 分块结果保存路径
# -----------------------------------------------------------------------------
# ====================== 第一步:PDF文本提取(复用Day85代码,巩固知识点) ======================
def extract_full_text_from_pdf(pdf_path):
"""
提取PDF全量文本内容,返回拼接后的完整长文本
:param pdf_path: PDF文件路径
:return: 完整长文本字符串
"""
with pdfplumber.open(pdf_path) as pdf:
full_text_list = []
for page in pdf.pages:
page_text = page.extract_text(
x_tolerance=0.5, y_tolerance=0.5,keep_blank_chars=False
)
if page_text.strip():
full_text_list.append(page_text)
# 拼接所有页面的文本,用换行符分隔
full_text = "\n".join(full_text_list)
print(f"文本提取完成,总文本上数:{len(full_text)}")
return full_text
# ====================== 第二步:文档分段(按天然语义结构拆分) ======================
def split_text_to_paragraphs(full_text):
"""
把长文本按天然语义结构拆分成独立段落
拆分规则:1. 先按双换行拆分;2. 过滤空段落;3. 按单换行拆分超长行
:param full_text: 输入的完整长文本
:return: 拆分后的段落列表
"""
# 1. 最高优先级:按双换行符拆分段落
raw_paragraphs = full_text.split("\n\n")
# 2. 过滤空段落、无有效内容的段落
valid_paragraphs = []
for param in raw_paragraphs:
# 去除前后所有空格、换行、制表符,判断是否有有效内容
cleaned_pare = param.strip()
if len(cleaned_pare) > 50: # 过滤掉长度小于50的无效短段落
valid_paragraphs.append(cleaned_pare)
# 3. 兜底:对超长段落按单换行二次拆分,避免单个段落过长
final_paragraphs = []
for paragraph in valid_paragraphs:
if len(paragraph) > 2000: # 超过2000字符的段落,按单换行二次拆分
sub_paras = paragraph.split("\n")
for sub_para in sub_paras:
cleaned_sub_para = sub_para.strip()
if len(cleaned_sub_para) > 50:
final_paragraphs.append(cleaned_sub_para)
else:
final_paragraphs.append(paragraph)
print(f"文档分段完成,共拆分出 {len(final_paragraphs)} 个有效段落")
print(f"段落长度统计:最长 {max([len(p) for p in final_paragraphs])} 字符,最短 {min([len(p) for p in final_paragraphs])} 字符")
return final_paragraphs
# ====================== 第三步:文档切块(带重叠滑动窗口,RAG推荐方案) ======================
def sliding_window_chunk_paragraph(paragraph, chunk_size=500, overlap_ratio =0.2):
"""
对单个段落进行带重叠的滑动窗口切块
:param paragraph: 输入的单个长段落
:param chunk_size: 每个文本块的最大字符数,默认500
:param overlap_ratio: 相邻块的重叠比例,默认20%
:return: 该段落拆分后的文本块列表
"""
# 计算重叠字符数和滑动步长
overlap_length = int(chunk_size * overlap_ratio)
step_length = chunk_size - overlap_length
chunk_list = []
para_length = len(paragraph)
# 滑动窗口遍历段落
for start_index in range(0, para_length, step_length):
end_index = min(start_index + chunk_size, para_length)
current_chunk = paragraph[start_index:end_index]
# 过滤掉空块
if current_chunk.strip():
chunk_list.append(current_chunk)
return chunk_list
def chunk_all_paragraphs(paragraph_list, chunk_size=500, overlap_ratio=0.2):
"""
对所有段落进行批量切块,汇总所有文本块
:param paragraph_list: 分段后的段落列表
:param chunk_size: 每个文本块的最大字符数
:param overlap_ratio: 相邻块的重叠比例
:return: 最终的所有文本块列表
"""
all_chunks = []
for para_index, para in enumerate(paragraph_list):
# 对单个段落切块
para_chunks = sliding_window_chunk_paragraph(para, chunk_size, overlap_ratio)
# 汇总所有块,添加段落来源信息
for chunk_idx, chunk in enumerate(para_chunks):
all_chunks.append({
"paragraph_id": para_index + 1, # 所属段落ID
"chunk_id": chunk_idx + 1, # 块ID
"chunk_content": chunk, # 块内容
"chunk_length": len(chunk) # 块长度
})
print(f" 文档切块完成,共生成 {len(all_chunks)} 个有效文本块")
print(f" 块长度统计:最长 {max([c['chunk_length'] for c in all_chunks])} 字符,最短 {min([c['chunk_length'] for c in all_chunks])} 字符")
return all_chunks
# ====================== 第四步:分块结果保存(巩固你之前问的json.dump知识点) ======================
def save_chunk_result_to_json(chunk_result, save_path):
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(chunk_result, f, ensure_ascii=False, indent=2)
print(f" 分块结果已保存至:{save_path}")
print(f" 共保存 {len(chunk_result)} 个文本块,可直接用于后续向量化环节")
if __name__ == "__main__":
full_text = extract_full_text_from_pdf(pdf_file_path)
paragraph_list = split_text_to_paragraphs(full_text)
# 执行文档切块(使用RAG推荐参数:块大小500,重叠20%)
final_chunk_result = chunk_all_paragraphs(paragraph_list, chunk_size=500, overlap_ratio=0.2)
save_chunk_result_to_json(final_chunk_result,output_chunk_path)
# ====================== 第五步:分块结果预览与校验(验收标准) ======================
print("\n" + "="*50)
print(" 分块结果预览(前5个块):")
print("="*50)
for chunk in final_chunk_result[:5]:
print(f"\n--- 段落{chunk['paragraph_id']} | 块{chunk['chunk_id']} | 长度{chunk['chunk_length']}字符 ---")
print(f"内容:{chunk['chunk_content'][:200]}...") # 只打印前200个字符预览
print("\n" + "="*50)
print(" 验收标准校验结果:")
print("="*50)
# 校验1:所有块长度是否符合要求
max_length = max([c['chunk_length'] for c in final_chunk_result])
print(f"1. 块长度合规性:最大块长度{max_length}字符,符合预设500字符要求 ")
# 校验2:是否有无效空块
empty_chunk_count = len([c for c in final_chunk_result if len(c['chunk_content'].strip()) == 0])
print(f"2. 无效空块数量:{empty_chunk_count}个,无无效空块 ")
# 校验3:是否有完整的结构化结果
print(f"3. 结构化结果:共生成{len(final_chunk_result)}个带ID、长度、内容的结构化文本块 ")

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)