一、文档分块核心认知

先搞懂:「分段」和「切块」到底是什么?有什么区别?
概念 大白话解释 核心作用
文档分段 把 PDF 提取出来的超长文本,按天然的语义结构(换行、段落、标题、空行)拆分成一个个独立的「段落」 先把文本按人的阅读习惯拆成逻辑单元,避免把不相关的内容混在一起,为后续切块做准备
文档切块(Chunking) 把拆分好的长段落,再按固定的长度规则拆分成更短的「文本块」,适配向量模型的输入长度限制 解决长文本无法输入向量模型的问题,同时保证每个文本块的语义完整性,是 RAG 系统的核心基础环节
为什么必须先「分段」再「切块」?

直接对长文本切块,会有一个核心问题:把两个完全不相关的段落切到同一个文本块里,导致向量模型生成的向量语义混乱,检索效果极差

# -*- coding: utf-8 -*-
"""
@Created on : 2026/6/4 16:05
@creator : er_nao
@File :day_87.py
@Description :文档分段、切块策略
"""
import pdfplumber
import json

# -------------------------- 请替换为您的PDF文件路径 --------------------------
pdf_file_path = "C:\\Users\\hp\\Desktop\\NLP学习数据\\arctle2.pdf"  # 替换为您的PDF文件路径
output_chunk_path = "C:\\Users\\hp\\Desktop\\NLP学习数据\\文档分块结果.json"  # 分块结果保存路径
# -----------------------------------------------------------------------------

# ====================== 第一步:PDF文本提取(复用Day85代码,巩固知识点) ======================
def extract_full_text_from_pdf(pdf_path):
    """
      提取PDF全量文本内容,返回拼接后的完整长文本
      :param pdf_path: PDF文件路径
      :return: 完整长文本字符串
    """
    with pdfplumber.open(pdf_path) as pdf:
        full_text_list = []
        for page in pdf.pages:
            page_text = page.extract_text(
                x_tolerance=0.5, y_tolerance=0.5,keep_blank_chars=False
            )
            if page_text.strip():
                full_text_list.append(page_text)
        # 拼接所有页面的文本,用换行符分隔
        full_text = "\n".join(full_text_list)
        print(f"文本提取完成,总文本上数:{len(full_text)}")

        return full_text

# ====================== 第二步:文档分段(按天然语义结构拆分) ======================
def split_text_to_paragraphs(full_text):
    """
        把长文本按天然语义结构拆分成独立段落
        拆分规则:1. 先按双换行拆分;2. 过滤空段落;3. 按单换行拆分超长行
        :param full_text: 输入的完整长文本
        :return: 拆分后的段落列表
    """
    # 1. 最高优先级:按双换行符拆分段落
    raw_paragraphs = full_text.split("\n\n")

    # 2. 过滤空段落、无有效内容的段落
    valid_paragraphs = []
    for param in raw_paragraphs:
        # 去除前后所有空格、换行、制表符,判断是否有有效内容
        cleaned_pare = param.strip()
        if len(cleaned_pare) > 50: # 过滤掉长度小于50的无效短段落
            valid_paragraphs.append(cleaned_pare)

    # 3. 兜底:对超长段落按单换行二次拆分,避免单个段落过长
    final_paragraphs = []
    for paragraph in valid_paragraphs:
        if len(paragraph) > 2000: # 超过2000字符的段落,按单换行二次拆分
            sub_paras = paragraph.split("\n")
            for sub_para in sub_paras:
                cleaned_sub_para = sub_para.strip()
                if len(cleaned_sub_para) > 50:
                    final_paragraphs.append(cleaned_sub_para)
        else:
            final_paragraphs.append(paragraph)

    print(f"文档分段完成,共拆分出 {len(final_paragraphs)} 个有效段落")
    print(f"段落长度统计:最长 {max([len(p) for p in final_paragraphs])} 字符,最短 {min([len(p) for p in final_paragraphs])} 字符")
    return final_paragraphs


# ====================== 第三步:文档切块(带重叠滑动窗口,RAG推荐方案) ======================
def sliding_window_chunk_paragraph(paragraph, chunk_size=500, overlap_ratio =0.2):
    """
        对单个段落进行带重叠的滑动窗口切块
        :param paragraph: 输入的单个长段落
        :param chunk_size: 每个文本块的最大字符数,默认500
        :param overlap_ratio: 相邻块的重叠比例,默认20%
        :return: 该段落拆分后的文本块列表
    """
    # 计算重叠字符数和滑动步长
    overlap_length = int(chunk_size * overlap_ratio)
    step_length = chunk_size - overlap_length
    chunk_list = []
    para_length = len(paragraph)

    # 滑动窗口遍历段落
    for start_index in range(0, para_length, step_length):
        end_index = min(start_index + chunk_size, para_length)
        current_chunk = paragraph[start_index:end_index]
        # 过滤掉空块
        if current_chunk.strip():
            chunk_list.append(current_chunk)

    return chunk_list


def chunk_all_paragraphs(paragraph_list, chunk_size=500, overlap_ratio=0.2):
    """
        对所有段落进行批量切块,汇总所有文本块
        :param paragraph_list: 分段后的段落列表
        :param chunk_size: 每个文本块的最大字符数
        :param overlap_ratio: 相邻块的重叠比例
        :return: 最终的所有文本块列表
    """
    all_chunks = []
    for para_index, para in enumerate(paragraph_list):
        # 对单个段落切块
        para_chunks = sliding_window_chunk_paragraph(para, chunk_size, overlap_ratio)
        # 汇总所有块,添加段落来源信息
        for chunk_idx, chunk in enumerate(para_chunks):
            all_chunks.append({
                "paragraph_id": para_index + 1,  # 所属段落ID
                "chunk_id": chunk_idx + 1,  # 块ID
                "chunk_content": chunk,  # 块内容
                "chunk_length": len(chunk)  # 块长度
            })

    print(f" 文档切块完成,共生成 {len(all_chunks)} 个有效文本块")
    print(f" 块长度统计:最长 {max([c['chunk_length'] for c in all_chunks])} 字符,最短 {min([c['chunk_length'] for c in all_chunks])} 字符")
    return all_chunks

# ====================== 第四步:分块结果保存(巩固你之前问的json.dump知识点) ======================
def save_chunk_result_to_json(chunk_result, save_path):
    with open(save_path, 'w', encoding='utf-8') as f:
        json.dump(chunk_result, f, ensure_ascii=False, indent=2)

    print(f" 分块结果已保存至:{save_path}")
    print(f" 共保存 {len(chunk_result)} 个文本块,可直接用于后续向量化环节")


if __name__ == "__main__":
    full_text = extract_full_text_from_pdf(pdf_file_path)

    paragraph_list = split_text_to_paragraphs(full_text)

    # 执行文档切块(使用RAG推荐参数:块大小500,重叠20%)
    final_chunk_result = chunk_all_paragraphs(paragraph_list, chunk_size=500, overlap_ratio=0.2)

    save_chunk_result_to_json(final_chunk_result,output_chunk_path)



# ====================== 第五步:分块结果预览与校验(验收标准) ======================
print("\n" + "="*50)
print(" 分块结果预览(前5个块):")
print("="*50)
for chunk in final_chunk_result[:5]:
    print(f"\n--- 段落{chunk['paragraph_id']} | 块{chunk['chunk_id']} | 长度{chunk['chunk_length']}字符 ---")
    print(f"内容:{chunk['chunk_content'][:200]}...")  # 只打印前200个字符预览

print("\n" + "="*50)
print(" 验收标准校验结果:")
print("="*50)
# 校验1:所有块长度是否符合要求
max_length = max([c['chunk_length'] for c in final_chunk_result])
print(f"1. 块长度合规性:最大块长度{max_length}字符,符合预设500字符要求 ")
# 校验2:是否有无效空块
empty_chunk_count = len([c for c in final_chunk_result if len(c['chunk_content'].strip()) == 0])
print(f"2. 无效空块数量:{empty_chunk_count}个,无无效空块 ")
# 校验3:是否有完整的结构化结果
print(f"3. 结构化结果:共生成{len(final_chunk_result)}个带ID、长度、内容的结构化文本块 ")


在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐