本系列共 3 部分,从数据模型到加载器到分块策略,完整拆解 LangChain RAG 管线的数据基础。

  • 第 1 部分:Document 与 Blob——RAG 管线的数据基石(本文)
  • 第 2 部分:DocumentLoader 与 BlobParser——数据的入口
  • 第 3 部分:TextSplitter——文本分块的艺术

LangChain Documents 深度解析(一):Document 与 Blob——RAG 管线的数据基石

一个能跑的例子

from langchain_core.documents import Document
from langchain_core.documents.base import Blob
import tempfile, os

# ---- Document:文本 + 元数据 ----
doc = Document(
    page_content="LangChain 是一个 LLM 应用框架。",
    metadata={"source": "readme.md", "page": 1},
    id="doc-001",
)
print(doc.page_content)  # LangChain 是一个 LLM 应用框架。
print(doc.metadata)      # {'source': 'readme.md', 'page': 1}
print(doc.id)            # doc-001
print(str(doc))          # page_content='LangChain 是一个 LLM 应用框架。' metadata={'source': 'readme.md', 'page': 1}

# ---- Blob:原始数据的延迟加载 ----
# 方式 1:从内存创建
blob_mem = Blob.from_data("Hello, World!", mime_type="text/plain")
print(blob_mem.as_string())  # Hello, World!
print(blob_mem.as_bytes())   # b'Hello, World!'

# 方式 2:从文件创建(延迟加载,不立即读取)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
    f.write("文件内容在这里")
    tmp_path = f.name

blob_file = Blob.from_path(tmp_path)
print(blob_file.data)        # None ← 数据还没读
print(blob_file.as_string())  # 文件内容在这里 ← 访问时才读
print(blob_file.source)       # /tmp/xxx.txt

os.unlink(tmp_path)

Document 是 LangChain RAG 管线中流通的核心数据单元;Blob 是原始文件的抽象封装。两者都继承自 BaseMedia


BaseMedia——所有内容的基类

打开 documents/base.py:34

# documents/base.py:34-56
class BaseMedia(Serializable):
    """Base class for content used in retrieval and data processing workflows."""

    id: str | None = Field(default=None, coerce_numbers_to_str=True)
    """An optional identifier for the document."""

    metadata: dict = Field(default_factory=dict)
    """Arbitrary metadata associated with the content."""

两个字段,极其简洁:

  • id:可选标识符,coerce_numbers_to_str=True 允许传入数字自动转字符串。注释明确说将来会变为必填。
  • metadata:任意字典,存放来源、页码、时间戳等信息。

继承自 Serializable(来自 langchain_core.load.serializable),获得 JSON 序列化/反序列化能力。

BaseMediaDocumentBlob 的公共父类:

Serializable
  └── BaseMedia                  # id + metadata
        ├── Document             # + page_content(文本)
        └── Blob                 # + data/path(原始字节)

Document——文本内容的载体

Document 是你在 LangChain 中最常打交道的类——向量存储里存的是它,检索器返回的是它,TextSplitter 切出来的也是它。

# documents/base.py:288-348
class Document(BaseMedia):
    """Class for storing a piece of text and associated metadata."""

    page_content: str
    """String text."""

    type: Literal["Document"] = "Document"

核心就一个字段 page_content: str,加上从 BaseMedia 继承的 idmetadatatype 是类型判别字段,固定为 "Document"

自定义 __init__——支持位置参数

# documents/base.py:311-315
def __init__(self, page_content: str, **kwargs: Any) -> None:
    """Pass page_content in as positional or named arg."""
    super().__init__(page_content=page_content, **kwargs)

Pydantic v2 的模型默认只接受关键字参数,但 LangChain 通过自定义 __init__ 让你可以写 Document("hello") 而不是 Document(page_content="hello")——这个小改动让 API 更自然。

__str__() 的向后兼容

# documents/base.py:331-347
def __str__(self) -> str:
    if self.metadata:
        return f"page_content='{self.page_content}' metadata={self.metadata}"
    return f"page_content='{self.page_content}'"

注意:__str__() 故意不包含 id 字段。注释里解释了原因——很多用户代码会直接把 Document 对象塞进 prompt 模板(f"{doc}"),如果突然输出多了 id 字段,会破坏已有的 prompt 格式。这是典型的向后兼容性保护

序列化支持

# documents/base.py:317-329
@classmethod
def is_lc_serializable(cls) -> bool:
    return True

@classmethod
def get_lc_namespace(cls) -> list[str]:
    return ["langchain", "schema", "document"]

is_lc_serializable 返回 True,意味着 Document 可以通过 dumpd(doc) / load(data) 进行 JSON 序列化/反序列化。命名空间 ["langchain", "schema", "document"] 是历史原因——早期 Document 在 langchain.schema 模块下。

Document 与 Message 的区别

初学者常混淆这两个概念。一句话区分:

维度 Document Message
用途 检索/存储/RAG LLM 对话
核心字段 page_content content
典型来源 文件、数据库、网页 用户输入、模型回复
流向 文档 → 向量存储 → 检索器 → prompt 用户 → LLM → 用户
父类 BaseMedia BaseMessage

documents/__init__.py 的模块文档说得很清楚:

Document is for data retrieval and processing workflows, not chat I/O.


Blob——原始数据的延迟加载抽象

Blob 的设计灵感来自浏览器的 Blob API——它代表一块原始数据(文本或二进制),可以来自内存,也可以来自文件系统。

# documents/base.py:59-134
class Blob(BaseMedia):
    """Raw data abstraction for document loading and file processing."""

    data: bytes | str | None = None        # 内存数据
    mimetype: str | None = None            # MIME 类型
    encoding: str = "utf-8"                # 编码
    path: PathLike | None = None           # 文件路径

    model_config = ConfigDict(
        arbitrary_types_allowed=True,
        frozen=True,                       # ← 不可变!
    )

注意 frozen=True——Blob 是不可变对象。一旦创建,不能修改任何字段。这保证了在多线程环境下的安全性。

校验器:data 和 path 至少有一个

# documents/base.py:149-156
@model_validator(mode="before")
@classmethod
def check_blob_is_valid(cls, values: dict[str, Any]) -> Any:
    """Verify that either data or path is provided."""
    if "data" not in values and "path" not in values:
        msg = "Either data or path must be provided"
        raise ValueError(msg)
    return values

不能创建一个既没有 data 又没有 path 的 Blob——那就是一个空壳了。

三种读取方式

# documents/base.py:158-211
def as_string(self) -> str:     # → str
def as_bytes(self) -> bytes:    # → bytes
def as_bytes_io(self):          # → context manager, yields BytesIO | BufferedReader

每种方法内部都遵循同一优先级:

                as_string() 的决策树
                ┌──────────────┐
                │ data is None │
                │ and path?    │
                └──────┬───────┘
                  yes  │  no
                  ▼    │
    Path(path).read_text()  │
                       ▼
              ┌────────────────┐
              │ data is bytes? │
              └───────┬────────┘
                yes   │  no
                ▼     │
    data.decode(encoding)   │
                      ▼
              ┌────────────────┐
              │ data is str?   │
              └───────┬────────┘
                yes   │  no
                ▼     │
            return data  raise ValueError

as_bytes_io() 特别巧妙——如果 Blob 持有文件路径,它直接 open("rb") 返回文件句柄;如果持有内存数据,它包装成 BytesIO。调用者无需关心数据来源,统一用 with blob.as_bytes_io() as f: 即可。

两个工厂方法

# documents/base.py:213-278
@classmethod
def from_path(cls, path, *, encoding="utf-8", mime_type=None,
              guess_type=True, metadata=None) -> Blob:
    # 注意:data=None,不立即读取文件内容
    return cls(data=None, mimetype=mimetype, encoding=encoding,
               path=path, metadata=metadata or {})

@classmethod
def from_data(cls, data, *, encoding="utf-8", mime_type=None,
              path=None, metadata=None) -> Blob:
    return cls(data=data, mimetype=mime_type, encoding=encoding,
               path=path, metadata=metadata or {})

from_path 的关键设计:延迟加载——它不读取文件内容,只保存路径引用。数据在 as_string() / as_bytes() 被调用时才真正读取。

    from_path("big_file.pdf")           from_data(b"small content")
    ┌─────────────────────┐             ┌─────────────────────┐
    │ data = None         │             │ data = b"small..."  │
    │ path = "big_file"   │             │ path = None         │
    │ 内存占用:≈ 0       │             │ 内存占用:数据大小  │
    └─────────────────────┘             └─────────────────────┘
           │                                    │
           │ .as_bytes()                        │ .as_bytes()
           ▼                                    ▼
    Path(path).read_bytes()              直接返回 data
    (此时才读取文件)                  (已在内存中)

source 属性

# documents/base.py:136-147
@property
def source(self) -> str | None:
    if self.metadata and "source" in self.metadata:
        return cast("str | None", self.metadata["source"])
    return str(self.path) if self.path else None

优先级:metadata["source"] > self.path > None。这允许用户通过 metadata 覆盖默认的来源信息(比如给文件路径加上 URL 前缀)。


BaseDocumentTransformer——文档变换的抽象接口

BaseDocumentTransformer 定义了"输入一批文档,输出一批文档"的抽象。TextSplitter(第 3 部分)就是它的子类。

# documents/transformers.py:16-79
class BaseDocumentTransformer(ABC):
    """Abstract base class for document transformation."""

    @abstractmethod
    def transform_documents(
        self, documents: Sequence[Document], **kwargs: Any
    ) -> Sequence[Document]:
        """Transform a list of documents."""

    async def atransform_documents(
        self, documents: Sequence[Document], **kwargs: Any
    ) -> Sequence[Document]:
        return await run_in_executor(
            None, self.transform_documents, documents, **kwargs
        )

两个要点:

  1. 只有一个抽象方法 transform_documents(),子类必须实现
  2. 异步版本 atransform_documents()run_in_executor 提供默认实现——和 Message/Runnable 系列的异步策略一脉相承

注意它不继承 BaseModel——这意味着它不是 Pydantic 模型。子类可以自由选择是否使用 Pydantic。


BaseDocumentCompressor——检索后的压缩/重排

# documents/compressor.py:19-74
class BaseDocumentCompressor(BaseModel, ABC):
    """Base class for document compressors."""

    @abstractmethod
    def compress_documents(
        self,
        documents: Sequence[Document],
        query: str,
        callbacks: Callbacks | None = None,
    ) -> Sequence[Document]:
        """Compress retrieved documents given the query context."""

    async def acompress_documents(
        self, documents: Sequence[Document], query: str,
        callbacks: Callbacks | None = None,
    ) -> Sequence[Document]:
        return await run_in_executor(
            None, self.compress_documents, documents, query, callbacks
        )

BaseDocumentTransformer 对比:

维度 BaseDocumentTransformer BaseDocumentCompressor
继承 ABC BaseModel, ABC
核心方法 transform_documents(docs) compress_documents(docs, query)
接受 query
接受 callbacks
典型用途 分块、格式转换 重排序、过滤、摘要

关键区别是 compress_documents 多了 query 参数——因为压缩/重排需要根据查询来判断哪些文档更相关。

不过,官方文档建议直接用 RunnableLambda 替代这个接口:

Users should favor using a RunnableLambda instead of sub-classing from this interface.


Document 在 LangChain 生态中的位置

    ┌─────────────────── 数据源 ───────────────────┐
    │  文件  │  网页  │  数据库  │  API  │  LangSmith │
    └───┬────┴───┬────┴────┬─────┴───┬───┴─────┬─────┘
        │        │         │         │         │
        ▼        ▼         ▼         ▼         ▼
    ┌─────────────────────────────────────────────────┐
    │            BaseLoader / BlobLoader               │  ← 第 2 部分
    │         lazy_load() → Iterator[Document]         │
    └──────────────────────┬──────────────────────────┘
                           │
                           ▼
    ┌─────────────────────────────────────────────────┐
    │              Document                            │
    │   page_content: str  +  metadata: dict           │  ← 本篇
    │   id: str | None                                 │
    └──────────────────────┬──────────────────────────┘
                           │
                           ▼
    ┌─────────────────────────────────────────────────┐
    │        TextSplitter (BaseDocumentTransformer)     │  ← 第 3 部分
    │      split_documents() → list[Document]          │
    └──────────────────────┬──────────────────────────┘
                           │
              ┌────────────┴────────────┐
              ▼                         ▼
    ┌──────────────────┐     ┌──────────────────┐
    │   Embeddings     │     │  VectorStore     │
    │  embed_documents │     │  add_documents   │
    └────────┬─────────┘     │  similarity_     │
             │               │    search        │
             └──────────────►│                  │
                             └────────┬─────────┘
                                      │
                                      ▼
                             ┌──────────────────┐
                             │  Retriever       │
                             │  invoke(query)   │
                             │  → list[Document]│
                             └────────┬─────────┘
                                      │
                                      ▼
                             ┌──────────────────┐
                             │  Prompt → LLM    │
                             │  (RAG pipeline)  │
                             └──────────────────┘

Document 是 RAG 管线中从头到尾流通的"货币"——加载器产出它,分块器拆分它,向量存储索引它,检索器返回它。


小结

  • BaseMedia 提供 id + metadata,是 Document 和 Blob 的公共基类
  • Document 只加了一个 page_content: str——简单到极致,但它是整个 RAG 管线的核心数据结构
  • Blob 是原始文件数据的抽象,支持延迟加载(from_path 不读文件),frozen 不可变
  • BaseDocumentTransformer 定义"文档进 → 文档出"的变换接口,TextSplitter 继承它
  • BaseDocumentCompressor 多了 query 参数,用于检索后的重排序/过滤

数据模型搞清楚了,那 Document 是怎么从原始数据源生产出来的?下一部分看 DocumentLoader。



LangChain Documents 深度解析(二):DocumentLoader 与 BlobParser——数据的入口

一个能跑的例子

from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
from typing import Iterator

# ---- 自定义 Loader:从字典列表生成 Document ----
class DictLoader(BaseLoader):
    def __init__(self, data: list[dict]):
        self.data = data

    def lazy_load(self) -> Iterator[Document]:
        for item in self.data:
            yield Document(
                page_content=item["text"],
                metadata={"source": item.get("source", "unknown")},
            )

# 使用
loader = DictLoader([
    {"text": "LangChain 是框架", "source": "doc1"},
    {"text": "Document 是核心", "source": "doc2"},
    {"text": "Loader 负责加载", "source": "doc3"},
])

# 方式 1:lazy_load — 生成器,逐条产出
for doc in loader.lazy_load():
    print(f"[{doc.metadata['source']}] {doc.page_content}")

# 方式 2:load — 一次性全部加载到列表
all_docs = loader.load()
print(f"共加载 {len(all_docs)} 个文档")

自定义 Loader 只需继承 BaseLoader 并实现 lazy_load() 生成器——就这一个方法。


BaseLoader——文档加载的核心接口

# document_loaders/base.py:26-33
class BaseLoader(ABC):
    """Interface for document loader.

    Implementations should implement the lazy-loading method using generators
    to avoid loading all documents into memory at once.

    `load` is provided just for user convenience and should not be overridden.
    """

lazy_load()load() 的关系

# document_loaders/base.py:91-100
def lazy_load(self) -> Iterator[Document]:
    if type(self).load != BaseLoader.load:
        return iter(self.load())                    # ← 向后兼容 fallback
    msg = f"{self.__class__.__name__} does not implement lazy_load()"
    raise NotImplementedError(msg)

# document_loaders/base.py:37-43
def load(self) -> list[Document]:
    return list(self.lazy_load())                   # ← 便捷方法

这里有一个精巧的向后兼容设计:

    推荐路径(新代码):
    ┌────────────────────┐      ┌──────────────────┐
    │ 子类实现 lazy_load │ ───► │ load() 调用      │
    │ (生成器)         │      │ list(lazy_load()) │
    └────────────────────┘      └──────────────────┘

    兼容路径(旧代码):
    ┌────────────────────┐      ┌──────────────────────────┐
    │ 子类只实现了 load  │ ◄─── │ lazy_load() 检测到       │
    │ (返回列表)       │      │ load 被覆盖,调用        │
    └────────────────────┘      │ iter(self.load())        │
                                └──────────────────────────┘

lazy_load 内部的 type(self).load != BaseLoader.load 检查:如果子类覆盖了 load() 但没覆盖 lazy_load(),就回退到调用子类的 load() 并包装成迭代器。这保证了旧的 Loader 实现不会报错。

异步版本 alazy_load()

# document_loaders/base.py:102-114
async def alazy_load(self) -> AsyncIterator[Document]:
    iterator = await run_in_executor(None, self.lazy_load)
    done = object()
    while True:
        doc = await run_in_executor(None, next, iterator, done)
        if doc is done:
            break
        yield doc

这段代码值得仔细看——它把同步生成器转成了异步生成器:

  1. 在线程池中调用 self.lazy_load() 获取迭代器
  2. 循环在线程池中调用 next(iterator, done) 逐个取值
  3. done 哨兵对象检测迭代结束

这意味着子类只需要实现同步的 lazy_load(),异步版本自动可用。当然,如果子类有真正的异步 I/O,应该覆盖 alazy_load()

load_and_split()——已废弃

# document_loaders/base.py:53-87
def load_and_split(self, text_splitter: TextSplitter | None = None) -> list[Document]:
    """Load Document and split into chunks."""
    if text_splitter is None:
        if not _HAS_TEXT_SPLITTERS:
            raise ImportError(msg)
        text_splitter_ = RecursiveCharacterTextSplitter()
    else:
        text_splitter_ = text_splitter
    docs = self.load()
    return text_splitter_.split_documents(docs)

注释写得很明确:Do not override this method. It should be considered to be deprecated!

这个方法把加载和分块耦合在一起了——更好的做法是分开调用:

# 推荐写法
docs = loader.load()
chunks = text_splitter.split_documents(docs)

# 不推荐
chunks = loader.load_and_split(text_splitter)

BaseBlobParser——从 Blob 到 Document

# document_loaders/base.py:117-155
class BaseBlobParser(ABC):
    """Abstract interface for blob parsers.

    A blob parser provides a way to parse raw data stored in a blob
    into one or more Document objects.
    """

    @abstractmethod
    def lazy_parse(self, blob: Blob) -> Iterator[Document]:
        """Lazy parsing interface."""

    def parse(self, blob: Blob) -> list[Document]:
        """Eagerly parse the blob into Document objects."""
        return list(self.lazy_parse(blob))

BaseLoader 的模式如出一辙——lazy_parse() 是抽象方法,parse() 是便捷包装。

BaseBlobParser 的作用是把解析逻辑加载逻辑中解耦出来:

    BaseLoader                         BaseBlobParser
    ┌──────────────────┐              ┌──────────────────┐
    │ 知道从哪里读数据 │              │ 知道怎么解析数据 │
    │ lazy_load()      │              │ lazy_parse(blob)  │
    │ → Iterator[Doc]  │              │ → Iterator[Doc]  │
    └──────────────────┘              └──────────────────┘
           │                                  │
           │ 直接产出 Document                │ 需要先拿到 Blob
           ▼                                  ▼
    一步到位                          BlobLoader → Blob → Parser → Document

为什么要多这一层?因为同一种文件格式(比如 PDF)可能来自不同的地方(本地文件、S3、HTTP),但解析逻辑是一样的。分离后,BlobLoader 负责"从哪里读",BaseBlobParser 负责"怎么解析"。


BlobLoader——原始数据的来源

# document_loaders/blob_loaders.py:19-34
class BlobLoader(ABC):
    """Abstract interface for blob loaders implementation.

    Implementer should be able to load raw content from a storage system
    according to some criteria and return the raw content lazily as
    a stream of blobs.
    """

    @abstractmethod
    def yield_blobs(self) -> Iterator[Blob]:
        """A lazy loader for raw data represented by LangChain's Blob object."""

纯粹的生成器接口——只负责扫描数据源、逐个 yield Blob 对象。典型实现会扫描文件系统目录,为每个匹配的文件创建一个 Blob.from_path()

三层加载架构

    ┌──────────────── 三层架构 ────────────────┐
    │                                          │
    │  ┌───────────┐                           │
    │  │ BlobLoader│  yield_blobs()            │
    │  │           │  → Iterator[Blob]         │
    │  └─────┬─────┘                           │
    │        │                                 │
    │        ▼                                 │
    │  ┌───────────┐                           │
    │  │   Blob    │  原始数据封装              │
    │  │           │  data / path / mimetype    │
    │  └─────┬─────┘                           │
    │        │                                 │
    │        ▼                                 │
    │  ┌───────────────┐                       │
    │  │ BaseBlobParser│  lazy_parse(blob)      │
    │  │               │  → Iterator[Document] │
    │  └───────────────┘                       │
    │                                          │
    └──────────────────────────────────────────┘

    vs. 简化架构:

    ┌──────────────── 直接加载 ────────────────┐
    │                                          │
    │  ┌───────────┐                           │
    │  │ BaseLoader│  lazy_load()              │
    │  │           │  → Iterator[Document]     │
    │  └───────────┘                           │
    │                                          │
    └──────────────────────────────────────────┘

大多数第三方 Loader(如 PyPDFLoaderCSVLoader)直接继承 BaseLoader,不走三层架构。三层架构适合需要复用解析逻辑的场景。


LangSmithLoader——唯一的内置具体实现

langchain-coredocument_loaders 模块中只有一个具体的 Loader 实现:

# document_loaders/langsmith.py:17-38
class LangSmithLoader(BaseLoader):
    """Load LangSmith Dataset examples as Document objects."""

它从 LangSmith 数据集加载数据为 Document。

content_key 支持嵌套访问

# document_loaders/langsmith.py:99
self.content_key = list(content_key.split(".")) if content_key else []

设置 content_key="input.text" 会按 . 分割为 ["input", "text"],然后在 lazy_load 中逐层访问:

# document_loaders/langsmith.py:126-129
content: Any = example.inputs
for key in self.content_key:
    content = content[key]          # ← 逐层深入
content_str = self.format_content(content)

metadata 转换

# document_loaders/langsmith.py:130-134
metadata = pydantic_to_dict(example)
for k in ("dataset_id", "created_at", "modified_at", "source_run_id", "id"):
    metadata[k] = str(metadata[k]) if metadata[k] else metadata[k]
yield Document(content_str, metadata=metadata)

把 Example 的所有字段都放进 metadata,然后把 UUID/datetime 类型转成字符串——因为 metadata 通常需要 JSON 可序列化。


Lazy Loading 设计模式

整个 Document Loader 体系围绕生成器(lazy loading)设计。对比两种模式:

维度 Eager Loading Lazy Loading
实现 load() → list[Document] lazy_load() → Iterator[Document]
内存 所有文档同时在内存 逐条产出,内存占用恒定
首个结果 全部加载完才能拿到 立即拿到第一个
适合场景 小数据集、需要随机访问 大数据集、流式处理
可组合性 需要完整列表 可以管道式处理

LangChain 的建议很明确:子类应该实现 lazy_load()load() 只是便捷包装。


document_loaders/__init__.py 的导出

# document_loaders/__init__.py:12-19
__all__ = (
    "BaseBlobParser",
    "BaseLoader",
    "Blob",
    "BlobLoader",
    "LangSmithLoader",
    "PathLike",
)

注意 BlobPathLike 是从 blob_loaders.py 重新导出的(blob_loaders.py:13documents.base 导入了 BlobPathLike),提供向后兼容的导入路径。


小结

  • BaseLoader 是所有 Loader 的基类,核心方法是 lazy_load() 生成器
  • load() 只是 list(lazy_load()) 的便捷包装,不应该被子类覆盖
  • 向后兼容lazy_load() 会检测子类是否只实现了 load(),自动 fallback
  • BaseBlobParser 负责"怎么解析",BlobLoader 负责"从哪里读"——三层架构解耦加载和解析
  • LangSmithLoaderlangchain-core 中唯一的具体 Loader 实现
  • 生成器模式(lazy loading)是整个体系的设计核心

文档加载出来了,但一个文档可能很长——需要拆分成合适大小的块才能送入向量存储。下一部分看 TextSplitter。



LangChain Documents 深度解析(三):TextSplitter——文本分块的艺术

一个能跑的例子

from langchain_text_splitters import RecursiveCharacterTextSplitter

text = """# LangChain 简介

LangChain 是一个用于构建 LLM 应用的框架。它提供了丰富的工具和抽象。

## 核心概念

Document 是数据的基本单位。每个 Document 包含文本内容和元数据。

TextSplitter 用于将长文本拆分为小块。这对于 RAG 管线至关重要,
因为向量存储通常对文本长度有限制。

## 设计哲学

简单、可组合、可扩展。LangChain 的每个组件都是 Runnable,
可以通过管道操作符连接。"""

splitter = RecursiveCharacterTextSplitter(
    chunk_size=100,
    chunk_overlap=20,
    separators=["\n\n", "\n", " ", ""],
)

chunks = splitter.split_text(text)
for i, chunk in enumerate(chunks):
    print(f"--- Chunk {i} ({len(chunk)} chars) ---")
    print(chunk)
    print()

RecursiveCharacterTextSplitter 会先尝试按 \n\n 分割,如果某块还是太长,就用 \n 分割,再不行用空格,最后逐字符。这是最常用的分块器。


TextSplitter 基类——模板方法模式

# text-splitters/base.py:44-45
class TextSplitter(BaseDocumentTransformer, ABC):
    """Interface for splitting text into chunks."""

继承自 BaseDocumentTransformer(第 1 部分讲过),所以 TextSplitter 也是"文档进 → 文档出"的变换器。

构造参数

# text-splitters/base.py:47-90
def __init__(
    self,
    chunk_size: int = 4000,           # 最大块大小
    chunk_overlap: int = 200,          # 块间重叠
    length_function: Callable[[str], int] = len,  # 长度计算函数
    keep_separator: bool | Literal["start", "end"] = False,  # 保留分隔符
    add_start_index: bool = False,     # 在 metadata 中记录起始位置
    strip_whitespace: bool = True,     # 去除首尾空白
) -> None:

每个参数都有校验:

# text-splitters/base.py:73-84
if chunk_size <= 0:
    raise ValueError(f"chunk_size must be > 0, got {chunk_size}")
if chunk_overlap < 0:
    raise ValueError(f"chunk_overlap must be >= 0, got {chunk_overlap}")
if chunk_overlap > chunk_size:
    raise ValueError(f"Got a larger chunk overlap ({chunk_overlap}) "
                     f"than chunk size ({chunk_size})")

模板方法模式

TextSplitter 的方法调用链非常清晰:

    split_text(text)              ← 抽象方法,子类必须实现
        ↑
    create_documents(texts)       ← 对每个 text 调用 split_text,创建 Document
        ↑
    split_documents(documents)    ← 提取 page_content,调用 create_documents
        ↑
    transform_documents(documents) ← BaseDocumentTransformer 接口,调用 split_documents

这是经典的模板方法模式——子类只需要实现 split_text(),上层的 create_documents → split_documents → transform_documents 全部自动工作。

create_documents()——分块 + 组装

# text-splitters/base.py:103-129
def create_documents(
    self, texts: list[str], metadatas: list[dict] | None = None
) -> list[Document]:
    metadatas_ = metadatas or [{}] * len(texts)
    documents = []
    for i, text in enumerate(texts):
        index = 0
        previous_chunk_len = 0
        for chunk in self.split_text(text):
            metadata = copy.deepcopy(metadatas_[i])
            if self._add_start_index:
                offset = index + previous_chunk_len - self._chunk_overlap
                index = text.find(chunk, max(0, offset))
                metadata["start_index"] = index
                previous_chunk_len = len(chunk)
            new_doc = Document(page_content=chunk, metadata=metadata)
            documents.append(new_doc)
    return documents

注意 add_start_index 的实现——它用 text.find(chunk, offset) 从上次结束位置附近搜索,避免从头扫描。offset = index + previous_chunk_len - chunk_overlap 考虑了重叠区域。

_merge_splits()——核心合并算法

这是 TextSplitter 最复杂的方法,负责把切分出的小片段合并成合适大小的块:

# text-splitters/base.py:152-194
def _merge_splits(self, splits: Iterable[str], separator: str) -> list[str]:
    separator_len = self._length_function(separator)
    docs = []
    current_doc: list[str] = []
    total = 0
    for d in splits:
        len_ = self._length_function(d)
        if (
            total + len_ + (separator_len if len(current_doc) > 0 else 0)
            > self._chunk_size
        ):
            if len(current_doc) > 0:
                doc = self._join_docs(current_doc, separator)
                if doc is not None:
                    docs.append(doc)
                # 弹出旧片段直到满足重叠约束
                while total > self._chunk_overlap or (
                    total + len_ + (separator_len if len(current_doc) > 0 else 0)
                    > self._chunk_size
                    and total > 0
                ):
                    total -= self._length_function(current_doc[0]) + (
                        separator_len if len(current_doc) > 1 else 0
                    )
                    current_doc = current_doc[1:]
        current_doc.append(d)
        total += len_ + (separator_len if len(current_doc) > 1 else 0)
    doc = self._join_docs(current_doc, separator)
    if doc is not None:
        docs.append(doc)
    return docs

用 ASCII 图解释合并过程(chunk_size=10, chunk_overlap=3):

    输入 splits: ["AA", "BB", "CC", "DD", "EE", "FF"]
    separator: " "

    步骤 1: current_doc=["AA"], total=2
    步骤 2: current_doc=["AA","BB"], total=5  (2+1+2)
    步骤 3: current_doc=["AA","BB","CC"], total=8  (5+1+2)
    步骤 4: "DD" 加入后 total=8+1+2=11 > 10 → 超限!
            → 输出 chunk: "AA BB CC"
            → 弹出 "AA": total=8-2-1=5, current_doc=["BB","CC"]
            → 弹出 "BB": total=5-2-1=2, current_doc=["CC"]
            → total=2 ≤ 3(overlap), 停止弹出
            → current_doc=["CC","DD"], total=2+1+2=5

    结果: ["AA BB CC", "CC DD EE", "EE FF"]
                  ^^          ^^
                  重叠区域     重叠区域

工厂方法

# text-splitters/base.py:225-281
@classmethod
def from_tiktoken_encoder(cls, encoding_name="gpt2", model_name=None,
                          allowed_special=set(), disallowed_special="all",
                          **kwargs) -> Self:
    """Text splitter that uses tiktoken encoder to count length."""
    # 用 tiktoken 的 token 数替代字符数作为 length_function
    def _tiktoken_encoder(text: str) -> int:
        return len(enc.encode(text, ...))
    return cls(length_function=_tiktoken_encoder, **kwargs)

# text-splitters/base.py:196-223
@classmethod
def from_huggingface_tokenizer(cls, tokenizer, **kwargs) -> TextSplitter:
    """Text splitter that uses HuggingFace tokenizer to count length."""
    def _huggingface_tokenizer_length(text: str) -> int:
        return len(tokenizer.tokenize(text))
    return cls(length_function=_huggingface_tokenizer_length, **kwargs)

关键洞察:这两个工厂方法只是替换了 length_function。切分逻辑不变,但长度计算从字符数变成了 token 数——这对于控制 LLM 输入长度非常重要。


CharacterTextSplitter——单分隔符切分

# character.py:11-58
class CharacterTextSplitter(TextSplitter):
    def __init__(self, separator="\n\n", is_separator_regex=False, **kwargs):
        super().__init__(**kwargs)
        self._separator = separator
        self._is_separator_regex = is_separator_regex

    def split_text(self, text: str) -> list[str]:
        sep_pattern = (
            self._separator if self._is_separator_regex
            else re.escape(self._separator)
        )
        splits = _split_text_with_regex(
            text, sep_pattern, keep_separator=self._keep_separator
        )
        merge_sep = ""
        if not (self._keep_separator or is_lookaround):
            merge_sep = self._separator
        return self._merge_splits(splits, merge_sep)

最简单的分块器——用单个分隔符切分,然后合并。

_split_text_with_regex() 的三种模式

# character.py:61-85
def _split_text_with_regex(text, separator, *, keep_separator):
    if separator:
        if keep_separator:
            splits_ = re.split(f"({separator})", text)     # ← 捕获组保留分隔符
            if keep_separator == "end":
                # ["text1", "\n", "text2"] → ["text1\n", "text2"]
                splits = [splits_[i] + splits_[i+1] for i in range(0, len(splits_)-1, 2)]
            else:  # "start" 或 True
                # ["text1", "\n", "text2"] → ["text1", "\ntext2"]
                splits = [splits_[i] + splits_[i+1] for i in range(1, len(splits_), 2)]
        else:
            splits = re.split(separator, text)              # ← 丢弃分隔符
    else:
        splits = list(text)                                 # ← 逐字符
    return [s for s in splits if s]

keep_separator 的三种取值:

  • False:丢弃分隔符
  • "start"True:分隔符附加到下一段的开头
  • "end":分隔符附加到上一段的结尾

RecursiveCharacterTextSplitter——最核心的分块器

# character.py:88-158
class RecursiveCharacterTextSplitter(TextSplitter):
    def __init__(self, separators=None, keep_separator=True,
                 is_separator_regex=False, **kwargs):
        super().__init__(keep_separator=keep_separator, **kwargs)
        self._separators = separators or ["\n\n", "\n", " ", ""]
        self._is_separator_regex = is_separator_regex

默认分隔符 ["\n\n", "\n", " ", ""]——从粗到细,段落 → 行 → 词 → 字符。

_split_text() 递归算法

# character.py:107-147
def _split_text(self, text: str, separators: list[str]) -> list[str]:
    final_chunks = []
    # 1. 找到第一个在文本中存在的分隔符
    separator = separators[-1]
    new_separators = []
    for i, s_ in enumerate(separators):
        if not s_:
            separator = s_
            break
        if re.search(separator_, text):
            separator = s_
            new_separators = separators[i + 1:]    # ← 剩余的更细分隔符
            break

    # 2. 用该分隔符切分
    splits = _split_text_with_regex(text, separator_, keep_separator=...)

    # 3. 对每个切分结果:
    good_splits = []
    for s in splits:
        if self._length_function(s) < self._chunk_size:
            good_splits.append(s)                  # ← 足够小,收集起来
        else:
            if good_splits:
                merged = self._merge_splits(good_splits, separator_)
                final_chunks.extend(merged)
                good_splits = []
            if not new_separators:
                final_chunks.append(s)             # ← 没有更细的分隔符了
            else:
                other = self._split_text(s, new_separators)  # ← 递归!
                final_chunks.extend(other)

    if good_splits:
        merged = self._merge_splits(good_splits, separator_)
        final_chunks.extend(merged)
    return final_chunks

用递归树来可视化(chunk_size=50):

    输入:一段 200 字的文本
    ┌────────────────────────────────────────────┐
    │ _split_text(text, ["\n\n", "\n", " ", ""]) │
    └────────────────────┬───────────────────────┘
                         │ 用 "\n\n" 切分
              ┌──────────┼──────────┐
              ▼          ▼          ▼
          段落A(30)   段落B(120)  段落C(40)
          ✓ < 50     ✗ > 50      ✓ < 50
              │          │          │
              │          ▼          │
              │   递归: _split_text(段落B, ["\n", " ", ""])
              │          │
              │   用 "\n" 切分
              │   ┌──────┼──────┐
              │   ▼      ▼      ▼
              │ 行1(45) 行2(35) 行3(40)
              │ ✓       ✓      ✓
              │   │      │      │
              ▼   ▼      ▼      ▼
           合并后输出:["段落A", "行1", "行2 行3", "段落C"]

核心思想:先用粗粒度分隔符切分,如果某块还是太大,就用更细粒度的分隔符递归切分。这保证了切分点尽量落在语义边界上。

from_language() 工厂方法

# character.py:160-176
@classmethod
def from_language(cls, language: Language, **kwargs):
    separators = cls.get_separators_for_language(language)
    return cls(separators=separators, is_separator_regex=True, **kwargs)

Language 枚举支持 27 种编程语言(base.py:372-403)。每种语言有定制的分隔符列表:

语言 分隔符示例(前几个)
Python "\nclass ", "\ndef ", "\n\tdef ", "\n\n"
JavaScript "\nfunction ", "\nconst ", "\nlet ", "\nclass "
Markdown "\n#{1,6} ", "```\n", "\n\n"
Go "\nfunc ", "\nvar ", "\nconst ", "\ntype "
Rust "\nfn ", "\nconst ", "\nlet ", "\nif "
Java "\nclass ", "\npublic ", "\nprotected ", "\nprivate "

Python 的分隔符(character.py:363-374):

if language == Language.PYTHON:
    return [
        "\nclass ",         # 先按类定义切分
        "\ndef ",           # 再按顶层函数切分
        "\n\tdef ",         # 再按缩进方法切分
        "\n\n",             # 再按空行
        "\n",               # 再按行
        " ",                # 再按空格
        "",                 # 最后逐字符
    ]

TokenTextSplitter——基于 token 的切分

# base.py:298-369
class TokenTextSplitter(TextSplitter):
    def __init__(self, encoding_name="gpt2", model_name=None, ...):
        super().__init__(**kwargs)
        if model_name is not None:
            enc = tiktoken.encoding_for_model(model_name)
        else:
            enc = tiktoken.get_encoding(encoding_name)
        self._tokenizer = enc

    def split_text(self, text: str) -> list[str]:
        tokenizer = Tokenizer(
            chunk_overlap=self._chunk_overlap,
            tokens_per_chunk=self._chunk_size,
            decode=self._tokenizer.decode,
            encode=_encode,
        )
        return split_text_on_tokens(text=text, tokenizer=tokenizer)

核心逻辑在 split_text_on_tokens()base.py:422-450):

# base.py:422-450
def split_text_on_tokens(*, text: str, tokenizer: Tokenizer) -> list[str]:
    splits = []
    input_ids = tokenizer.encode(text)          # 全部编码为 token ID
    start_idx = 0
    while start_idx < len(input_ids):
        cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
        chunk_ids = input_ids[start_idx:cur_idx]
        decoded = tokenizer.decode(chunk_ids)    # 解码回文本
        if decoded:
            splits.append(decoded)
        if cur_idx == len(input_ids):
            break
        start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    return splits

RecursiveCharacterTextSplitter.from_tiktoken_encoder() 的区别:

维度 TokenTextSplitter from_tiktoken_encoder()
切分单位 token 字符(但用 token 计数)
切分方式 直接按 token 边界切 按分隔符切,合并时用 token 计数
精度 token 级精确 可能偏差几个 token
语义保留 可能在词中间切断 尽量在分隔符处切分

Header-Based Splitters——按标题切分

这两个 Splitter 不继承 TextSplitter,因为它们返回的是 Document(带 metadata)而不是 str

MarkdownHeaderTextSplitter

# markdown.py:23-280
class MarkdownHeaderTextSplitter:
    def __init__(self, headers_to_split_on: list[tuple[str, str]],
                 return_each_line=False, strip_headers=True):
        self.headers_to_split_on = sorted(
            headers_to_split_on, key=lambda split: len(split[0]), reverse=True
        )
from langchain_text_splitters import MarkdownHeaderTextSplitter

md_text = """# 第一章
第一章的内容。

## 第一节
第一节的内容。

## 第二节
第二节的内容。

# 第二章
第二章的内容。"""

splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[("#", "H1"), ("##", "H2")]
)
docs = splitter.split_text(md_text)
for doc in docs:
    print(f"metadata={doc.metadata}, content={doc.page_content[:30]}")

输出:

metadata={'H1': '第一章'}, content=第一章的内容。
metadata={'H1': '第一章', 'H2': '第一节'}, content=第一节的内容。
metadata={'H1': '第一章', 'H2': '第二节'}, content=第二节的内容。
metadata={'H1': '第二章'}, content=第二章的内容。

关键特性:metadata 中自动记录标题层级。当遇到新标题时,弹出栈中同级或更低级的标题(markdown.py:204-214),保证 metadata 反映当前的层级上下文。

HTMLHeaderTextSplitter

# html.py:83-345
class HTMLHeaderTextSplitter:
    def __init__(self, headers_to_split_on, return_each_element=False):
        self.headers_to_split_on = sorted(
            headers_to_split_on, key=lambda x: int(x[0][1:])
        )

逻辑类似 MarkdownHeaderTextSplitter,但基于 BeautifulSoup 解析 HTML DOM 树。它使用 DFS 遍历(html.py:276),对每个节点判断是否是目标标题标签。


RecursiveJsonSplitter——保持 JSON 结构

# json.py:12-191
class RecursiveJsonSplitter:
    """Splits JSON data into smaller, structured chunks
    while preserving hierarchy."""

    def __init__(self, max_chunk_size=2000, min_chunk_size=None):
        self.max_chunk_size = max_chunk_size
        self.min_chunk_size = (
            min_chunk_size if min_chunk_size is not None
            else max(max_chunk_size - 200, 50)
        )

不继承 TextSplitter——它操作的是 dict 而非 str。

from langchain_text_splitters import RecursiveJsonSplitter

data = {
    "name": "LangChain",
    "version": "0.3",
    "components": {
        "core": "基础抽象",
        "loaders": "数据加载",
        "splitters": "文本分块",
        "stores": "向量存储"
    },
    "features": ["RAG", "Agent", "Streaming"]
}

splitter = RecursiveJsonSplitter(max_chunk_size=100)
chunks = splitter.split_json(data)
for i, chunk in enumerate(chunks):
    print(f"Chunk {i}: {chunk}")

核心算法 _json_split()json.py:85-114)递归遍历 JSON 树,当当前块超过 max_chunk_size 且已达到 min_chunk_size 时,开启新块。_set_nested_dict() 保证了切分后每个块仍然保持原有的嵌套结构。


14 个内置 Splitter 对比表

Splitter 继承 切分方式 返回类型 典型场景
CharacterTextSplitter TextSplitter 单分隔符 list[str] 简单文本
RecursiveCharacterTextSplitter TextSplitter 多分隔符递归 list[str] 通用首选
TokenTextSplitter TextSplitter tiktoken token list[str] 精确控制 token 数
MarkdownTextSplitter Recursive... Markdown 分隔符 list[str] Markdown 文本
MarkdownHeaderTextSplitter Markdown 标题 list[Document] 按标题分章
ExperimentalMarkdownSyntaxTextSplitter Markdown 语法 list[Document] 保留原始格式
HTMLHeaderTextSplitter HTML 标题标签 list[Document] HTML 按标题分段
HTMLSectionSplitter HTML 标题 + XSLT list[Document] HTML 段落
HTMLSemanticPreservingSplitter BaseDoc... HTML 语义 list[Document] 保留 HTML 结构
RecursiveJsonSplitter JSON 结构递归 list[dict] JSON 数据
LatexTextSplitter Recursive... LaTeX 分隔符 list[str] LaTeX 文档
PythonCodeTextSplitter Recursive... Python 分隔符 list[str] Python 代码
NLTKTextSplitter TextSplitter NLTK 句子检测 list[str] 按句子切分
SpacyTextSplitter TextSplitter spaCy 句子检测 list[str] 按句子切分

选择建议:

  • 通用文本RecursiveCharacterTextSplitter(默认选择)
  • 代码RecursiveCharacterTextSplitter.from_language(Language.PYTHON)
  • MarkdownMarkdownHeaderTextSplitter(需要标题元数据时)或 MarkdownTextSplitter
  • HTMLHTMLHeaderTextSplitterHTMLSemanticPreservingSplitter
  • JSONRecursiveJsonSplitter
  • 精确 token 控制TokenTextSplitter

小结

  • TextSplitter 是分块器基类,用模板方法模式:子类实现 split_text(),上层自动工作
  • _merge_splits() 是核心合并算法,处理 chunk_size 和 chunk_overlap
  • RecursiveCharacterTextSplitter 是最常用的——多分隔符从粗到细递归切分
  • from_language() 支持 27 种编程语言的定制分隔符
  • Header-Based Splitters 不继承 TextSplitter,按标题切分并在 metadata 中记录层级
  • RecursiveJsonSplitter 保持 JSON 结构的递归切分


八个系列总览

系列 核心模块 关键类 一句话
Messages messages/ BaseMessage, AIMessage, HumanMessage LLM 对话的数据结构
Runnables runnables/ Runnable, RunnableSequence, RunnableLambda LCEL 管道的基础抽象
ChatModel language_models/ BaseChatModel, generate, stream LLM 调用的统一接口
Outputs outputs/, output_parsers/ ChatResult, BaseOutputParser 模型输出的结构化处理
Prompts prompts/ ChatPromptTemplate, MessagesPlaceholder 提示词模板与变量注入
Tools tools/ BaseTool, @tool, StructuredTool 函数调用与工具系统
Tracers tracers/ BaseTracer, LangChainTracer 可观测性与调试追踪
Documents documents/, document_loaders/, text-splitters/ Document, BaseLoader, TextSplitter RAG 管线的数据基础
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐