【LangChain 源码解析七:Document】
本系列共 3 部分,从数据模型到加载器到分块策略,完整拆解 LangChain RAG 管线的数据基础。
- 第 1 部分:Document 与 Blob——RAG 管线的数据基石(本文)
- 第 2 部分:DocumentLoader 与 BlobParser——数据的入口
- 第 3 部分:TextSplitter——文本分块的艺术
LangChain Documents 深度解析(一):Document 与 Blob——RAG 管线的数据基石
一个能跑的例子
from langchain_core.documents import Document
from langchain_core.documents.base import Blob
import tempfile, os
# ---- Document:文本 + 元数据 ----
doc = Document(
page_content="LangChain 是一个 LLM 应用框架。",
metadata={"source": "readme.md", "page": 1},
id="doc-001",
)
print(doc.page_content) # LangChain 是一个 LLM 应用框架。
print(doc.metadata) # {'source': 'readme.md', 'page': 1}
print(doc.id) # doc-001
print(str(doc)) # page_content='LangChain 是一个 LLM 应用框架。' metadata={'source': 'readme.md', 'page': 1}
# ---- Blob:原始数据的延迟加载 ----
# 方式 1:从内存创建
blob_mem = Blob.from_data("Hello, World!", mime_type="text/plain")
print(blob_mem.as_string()) # Hello, World!
print(blob_mem.as_bytes()) # b'Hello, World!'
# 方式 2:从文件创建(延迟加载,不立即读取)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write("文件内容在这里")
tmp_path = f.name
blob_file = Blob.from_path(tmp_path)
print(blob_file.data) # None ← 数据还没读
print(blob_file.as_string()) # 文件内容在这里 ← 访问时才读
print(blob_file.source) # /tmp/xxx.txt
os.unlink(tmp_path)
Document 是 LangChain RAG 管线中流通的核心数据单元;Blob 是原始文件的抽象封装。两者都继承自 BaseMedia。
BaseMedia——所有内容的基类
打开 documents/base.py:34:
# documents/base.py:34-56
class BaseMedia(Serializable):
"""Base class for content used in retrieval and data processing workflows."""
id: str | None = Field(default=None, coerce_numbers_to_str=True)
"""An optional identifier for the document."""
metadata: dict = Field(default_factory=dict)
"""Arbitrary metadata associated with the content."""
两个字段,极其简洁:
id:可选标识符,coerce_numbers_to_str=True允许传入数字自动转字符串。注释明确说将来会变为必填。metadata:任意字典,存放来源、页码、时间戳等信息。
继承自 Serializable(来自 langchain_core.load.serializable),获得 JSON 序列化/反序列化能力。
BaseMedia 是 Document 和 Blob 的公共父类:
Serializable
└── BaseMedia # id + metadata
├── Document # + page_content(文本)
└── Blob # + data/path(原始字节)
Document——文本内容的载体
Document 是你在 LangChain 中最常打交道的类——向量存储里存的是它,检索器返回的是它,TextSplitter 切出来的也是它。
# documents/base.py:288-348
class Document(BaseMedia):
"""Class for storing a piece of text and associated metadata."""
page_content: str
"""String text."""
type: Literal["Document"] = "Document"
核心就一个字段 page_content: str,加上从 BaseMedia 继承的 id 和 metadata。type 是类型判别字段,固定为 "Document"。
自定义 __init__——支持位置参数
# documents/base.py:311-315
def __init__(self, page_content: str, **kwargs: Any) -> None:
"""Pass page_content in as positional or named arg."""
super().__init__(page_content=page_content, **kwargs)
Pydantic v2 的模型默认只接受关键字参数,但 LangChain 通过自定义 __init__ 让你可以写 Document("hello") 而不是 Document(page_content="hello")——这个小改动让 API 更自然。
__str__() 的向后兼容
# documents/base.py:331-347
def __str__(self) -> str:
if self.metadata:
return f"page_content='{self.page_content}' metadata={self.metadata}"
return f"page_content='{self.page_content}'"
注意:__str__() 故意不包含 id 字段。注释里解释了原因——很多用户代码会直接把 Document 对象塞进 prompt 模板(f"{doc}"),如果突然输出多了 id 字段,会破坏已有的 prompt 格式。这是典型的向后兼容性保护。
序列化支持
# documents/base.py:317-329
@classmethod
def is_lc_serializable(cls) -> bool:
return True
@classmethod
def get_lc_namespace(cls) -> list[str]:
return ["langchain", "schema", "document"]
is_lc_serializable 返回 True,意味着 Document 可以通过 dumpd(doc) / load(data) 进行 JSON 序列化/反序列化。命名空间 ["langchain", "schema", "document"] 是历史原因——早期 Document 在 langchain.schema 模块下。
Document 与 Message 的区别
初学者常混淆这两个概念。一句话区分:
| 维度 | Document | Message |
|---|---|---|
| 用途 | 检索/存储/RAG | LLM 对话 |
| 核心字段 | page_content |
content |
| 典型来源 | 文件、数据库、网页 | 用户输入、模型回复 |
| 流向 | 文档 → 向量存储 → 检索器 → prompt | 用户 → LLM → 用户 |
| 父类 | BaseMedia |
BaseMessage |
documents/__init__.py 的模块文档说得很清楚:
Documentis for data retrieval and processing workflows, not chat I/O.
Blob——原始数据的延迟加载抽象
Blob 的设计灵感来自浏览器的 Blob API——它代表一块原始数据(文本或二进制),可以来自内存,也可以来自文件系统。
# documents/base.py:59-134
class Blob(BaseMedia):
"""Raw data abstraction for document loading and file processing."""
data: bytes | str | None = None # 内存数据
mimetype: str | None = None # MIME 类型
encoding: str = "utf-8" # 编码
path: PathLike | None = None # 文件路径
model_config = ConfigDict(
arbitrary_types_allowed=True,
frozen=True, # ← 不可变!
)
注意 frozen=True——Blob 是不可变对象。一旦创建,不能修改任何字段。这保证了在多线程环境下的安全性。
校验器:data 和 path 至少有一个
# documents/base.py:149-156
@model_validator(mode="before")
@classmethod
def check_blob_is_valid(cls, values: dict[str, Any]) -> Any:
"""Verify that either data or path is provided."""
if "data" not in values and "path" not in values:
msg = "Either data or path must be provided"
raise ValueError(msg)
return values
不能创建一个既没有 data 又没有 path 的 Blob——那就是一个空壳了。
三种读取方式
# documents/base.py:158-211
def as_string(self) -> str: # → str
def as_bytes(self) -> bytes: # → bytes
def as_bytes_io(self): # → context manager, yields BytesIO | BufferedReader
每种方法内部都遵循同一优先级:
as_string() 的决策树
┌──────────────┐
│ data is None │
│ and path? │
└──────┬───────┘
yes │ no
▼ │
Path(path).read_text() │
▼
┌────────────────┐
│ data is bytes? │
└───────┬────────┘
yes │ no
▼ │
data.decode(encoding) │
▼
┌────────────────┐
│ data is str? │
└───────┬────────┘
yes │ no
▼ │
return data raise ValueError
as_bytes_io() 特别巧妙——如果 Blob 持有文件路径,它直接 open("rb") 返回文件句柄;如果持有内存数据,它包装成 BytesIO。调用者无需关心数据来源,统一用 with blob.as_bytes_io() as f: 即可。
两个工厂方法
# documents/base.py:213-278
@classmethod
def from_path(cls, path, *, encoding="utf-8", mime_type=None,
guess_type=True, metadata=None) -> Blob:
# 注意:data=None,不立即读取文件内容
return cls(data=None, mimetype=mimetype, encoding=encoding,
path=path, metadata=metadata or {})
@classmethod
def from_data(cls, data, *, encoding="utf-8", mime_type=None,
path=None, metadata=None) -> Blob:
return cls(data=data, mimetype=mime_type, encoding=encoding,
path=path, metadata=metadata or {})
from_path 的关键设计:延迟加载——它不读取文件内容,只保存路径引用。数据在 as_string() / as_bytes() 被调用时才真正读取。
from_path("big_file.pdf") from_data(b"small content")
┌─────────────────────┐ ┌─────────────────────┐
│ data = None │ │ data = b"small..." │
│ path = "big_file" │ │ path = None │
│ 内存占用:≈ 0 │ │ 内存占用:数据大小 │
└─────────────────────┘ └─────────────────────┘
│ │
│ .as_bytes() │ .as_bytes()
▼ ▼
Path(path).read_bytes() 直接返回 data
(此时才读取文件) (已在内存中)
source 属性
# documents/base.py:136-147
@property
def source(self) -> str | None:
if self.metadata and "source" in self.metadata:
return cast("str | None", self.metadata["source"])
return str(self.path) if self.path else None
优先级:metadata["source"] > self.path > None。这允许用户通过 metadata 覆盖默认的来源信息(比如给文件路径加上 URL 前缀)。
BaseDocumentTransformer——文档变换的抽象接口
BaseDocumentTransformer 定义了"输入一批文档,输出一批文档"的抽象。TextSplitter(第 3 部分)就是它的子类。
# documents/transformers.py:16-79
class BaseDocumentTransformer(ABC):
"""Abstract base class for document transformation."""
@abstractmethod
def transform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
"""Transform a list of documents."""
async def atransform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
return await run_in_executor(
None, self.transform_documents, documents, **kwargs
)
两个要点:
- 只有一个抽象方法
transform_documents(),子类必须实现 - 异步版本
atransform_documents()用run_in_executor提供默认实现——和 Message/Runnable 系列的异步策略一脉相承
注意它不继承 BaseModel——这意味着它不是 Pydantic 模型。子类可以自由选择是否使用 Pydantic。
BaseDocumentCompressor——检索后的压缩/重排
# documents/compressor.py:19-74
class BaseDocumentCompressor(BaseModel, ABC):
"""Base class for document compressors."""
@abstractmethod
def compress_documents(
self,
documents: Sequence[Document],
query: str,
callbacks: Callbacks | None = None,
) -> Sequence[Document]:
"""Compress retrieved documents given the query context."""
async def acompress_documents(
self, documents: Sequence[Document], query: str,
callbacks: Callbacks | None = None,
) -> Sequence[Document]:
return await run_in_executor(
None, self.compress_documents, documents, query, callbacks
)
和 BaseDocumentTransformer 对比:
| 维度 | BaseDocumentTransformer | BaseDocumentCompressor |
|---|---|---|
| 继承 | ABC |
BaseModel, ABC |
| 核心方法 | transform_documents(docs) |
compress_documents(docs, query) |
| 接受 query | 否 | 是 |
| 接受 callbacks | 否 | 是 |
| 典型用途 | 分块、格式转换 | 重排序、过滤、摘要 |
关键区别是 compress_documents 多了 query 参数——因为压缩/重排需要根据查询来判断哪些文档更相关。
不过,官方文档建议直接用 RunnableLambda 替代这个接口:
Users should favor using a
RunnableLambdainstead of sub-classing from this interface.
Document 在 LangChain 生态中的位置
┌─────────────────── 数据源 ───────────────────┐
│ 文件 │ 网页 │ 数据库 │ API │ LangSmith │
└───┬────┴───┬────┴────┬─────┴───┬───┴─────┬─────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────┐
│ BaseLoader / BlobLoader │ ← 第 2 部分
│ lazy_load() → Iterator[Document] │
└──────────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Document │
│ page_content: str + metadata: dict │ ← 本篇
│ id: str | None │
└──────────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ TextSplitter (BaseDocumentTransformer) │ ← 第 3 部分
│ split_documents() → list[Document] │
└──────────────────────┬──────────────────────────┘
│
┌────────────┴────────────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Embeddings │ │ VectorStore │
│ embed_documents │ │ add_documents │
└────────┬─────────┘ │ similarity_ │
│ │ search │
└──────────────►│ │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Retriever │
│ invoke(query) │
│ → list[Document]│
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Prompt → LLM │
│ (RAG pipeline) │
└──────────────────┘
Document 是 RAG 管线中从头到尾流通的"货币"——加载器产出它,分块器拆分它,向量存储索引它,检索器返回它。
小结
- BaseMedia 提供
id+metadata,是 Document 和 Blob 的公共基类 - Document 只加了一个
page_content: str——简单到极致,但它是整个 RAG 管线的核心数据结构 - Blob 是原始文件数据的抽象,支持延迟加载(
from_path不读文件),frozen 不可变 - BaseDocumentTransformer 定义"文档进 → 文档出"的变换接口,TextSplitter 继承它
- BaseDocumentCompressor 多了
query参数,用于检索后的重排序/过滤
数据模型搞清楚了,那 Document 是怎么从原始数据源生产出来的?下一部分看 DocumentLoader。
LangChain Documents 深度解析(二):DocumentLoader 与 BlobParser——数据的入口
一个能跑的例子
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
from typing import Iterator
# ---- 自定义 Loader:从字典列表生成 Document ----
class DictLoader(BaseLoader):
def __init__(self, data: list[dict]):
self.data = data
def lazy_load(self) -> Iterator[Document]:
for item in self.data:
yield Document(
page_content=item["text"],
metadata={"source": item.get("source", "unknown")},
)
# 使用
loader = DictLoader([
{"text": "LangChain 是框架", "source": "doc1"},
{"text": "Document 是核心", "source": "doc2"},
{"text": "Loader 负责加载", "source": "doc3"},
])
# 方式 1:lazy_load — 生成器,逐条产出
for doc in loader.lazy_load():
print(f"[{doc.metadata['source']}] {doc.page_content}")
# 方式 2:load — 一次性全部加载到列表
all_docs = loader.load()
print(f"共加载 {len(all_docs)} 个文档")
自定义 Loader 只需继承 BaseLoader 并实现 lazy_load() 生成器——就这一个方法。
BaseLoader——文档加载的核心接口
# document_loaders/base.py:26-33
class BaseLoader(ABC):
"""Interface for document loader.
Implementations should implement the lazy-loading method using generators
to avoid loading all documents into memory at once.
`load` is provided just for user convenience and should not be overridden.
"""
lazy_load() 和 load() 的关系
# document_loaders/base.py:91-100
def lazy_load(self) -> Iterator[Document]:
if type(self).load != BaseLoader.load:
return iter(self.load()) # ← 向后兼容 fallback
msg = f"{self.__class__.__name__} does not implement lazy_load()"
raise NotImplementedError(msg)
# document_loaders/base.py:37-43
def load(self) -> list[Document]:
return list(self.lazy_load()) # ← 便捷方法
这里有一个精巧的向后兼容设计:
推荐路径(新代码):
┌────────────────────┐ ┌──────────────────┐
│ 子类实现 lazy_load │ ───► │ load() 调用 │
│ (生成器) │ │ list(lazy_load()) │
└────────────────────┘ └──────────────────┘
兼容路径(旧代码):
┌────────────────────┐ ┌──────────────────────────┐
│ 子类只实现了 load │ ◄─── │ lazy_load() 检测到 │
│ (返回列表) │ │ load 被覆盖,调用 │
└────────────────────┘ │ iter(self.load()) │
└──────────────────────────┘
lazy_load 内部的 type(self).load != BaseLoader.load 检查:如果子类覆盖了 load() 但没覆盖 lazy_load(),就回退到调用子类的 load() 并包装成迭代器。这保证了旧的 Loader 实现不会报错。
异步版本 alazy_load()
# document_loaders/base.py:102-114
async def alazy_load(self) -> AsyncIterator[Document]:
iterator = await run_in_executor(None, self.lazy_load)
done = object()
while True:
doc = await run_in_executor(None, next, iterator, done)
if doc is done:
break
yield doc
这段代码值得仔细看——它把同步生成器转成了异步生成器:
- 在线程池中调用
self.lazy_load()获取迭代器 - 循环在线程池中调用
next(iterator, done)逐个取值 - 用
done哨兵对象检测迭代结束
这意味着子类只需要实现同步的 lazy_load(),异步版本自动可用。当然,如果子类有真正的异步 I/O,应该覆盖 alazy_load()。
load_and_split()——已废弃
# document_loaders/base.py:53-87
def load_and_split(self, text_splitter: TextSplitter | None = None) -> list[Document]:
"""Load Document and split into chunks."""
if text_splitter is None:
if not _HAS_TEXT_SPLITTERS:
raise ImportError(msg)
text_splitter_ = RecursiveCharacterTextSplitter()
else:
text_splitter_ = text_splitter
docs = self.load()
return text_splitter_.split_documents(docs)
注释写得很明确:Do not override this method. It should be considered to be deprecated!
这个方法把加载和分块耦合在一起了——更好的做法是分开调用:
# 推荐写法
docs = loader.load()
chunks = text_splitter.split_documents(docs)
# 不推荐
chunks = loader.load_and_split(text_splitter)
BaseBlobParser——从 Blob 到 Document
# document_loaders/base.py:117-155
class BaseBlobParser(ABC):
"""Abstract interface for blob parsers.
A blob parser provides a way to parse raw data stored in a blob
into one or more Document objects.
"""
@abstractmethod
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""Lazy parsing interface."""
def parse(self, blob: Blob) -> list[Document]:
"""Eagerly parse the blob into Document objects."""
return list(self.lazy_parse(blob))
和 BaseLoader 的模式如出一辙——lazy_parse() 是抽象方法,parse() 是便捷包装。
BaseBlobParser 的作用是把解析逻辑从加载逻辑中解耦出来:
BaseLoader BaseBlobParser
┌──────────────────┐ ┌──────────────────┐
│ 知道从哪里读数据 │ │ 知道怎么解析数据 │
│ lazy_load() │ │ lazy_parse(blob) │
│ → Iterator[Doc] │ │ → Iterator[Doc] │
└──────────────────┘ └──────────────────┘
│ │
│ 直接产出 Document │ 需要先拿到 Blob
▼ ▼
一步到位 BlobLoader → Blob → Parser → Document
为什么要多这一层?因为同一种文件格式(比如 PDF)可能来自不同的地方(本地文件、S3、HTTP),但解析逻辑是一样的。分离后,BlobLoader 负责"从哪里读",BaseBlobParser 负责"怎么解析"。
BlobLoader——原始数据的来源
# document_loaders/blob_loaders.py:19-34
class BlobLoader(ABC):
"""Abstract interface for blob loaders implementation.
Implementer should be able to load raw content from a storage system
according to some criteria and return the raw content lazily as
a stream of blobs.
"""
@abstractmethod
def yield_blobs(self) -> Iterator[Blob]:
"""A lazy loader for raw data represented by LangChain's Blob object."""
纯粹的生成器接口——只负责扫描数据源、逐个 yield Blob 对象。典型实现会扫描文件系统目录,为每个匹配的文件创建一个 Blob.from_path()。
三层加载架构
┌──────────────── 三层架构 ────────────────┐
│ │
│ ┌───────────┐ │
│ │ BlobLoader│ yield_blobs() │
│ │ │ → Iterator[Blob] │
│ └─────┬─────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ Blob │ 原始数据封装 │
│ │ │ data / path / mimetype │
│ └─────┬─────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ BaseBlobParser│ lazy_parse(blob) │
│ │ │ → Iterator[Document] │
│ └───────────────┘ │
│ │
└──────────────────────────────────────────┘
vs. 简化架构:
┌──────────────── 直接加载 ────────────────┐
│ │
│ ┌───────────┐ │
│ │ BaseLoader│ lazy_load() │
│ │ │ → Iterator[Document] │
│ └───────────┘ │
│ │
└──────────────────────────────────────────┘
大多数第三方 Loader(如 PyPDFLoader、CSVLoader)直接继承 BaseLoader,不走三层架构。三层架构适合需要复用解析逻辑的场景。
LangSmithLoader——唯一的内置具体实现
langchain-core 的 document_loaders 模块中只有一个具体的 Loader 实现:
# document_loaders/langsmith.py:17-38
class LangSmithLoader(BaseLoader):
"""Load LangSmith Dataset examples as Document objects."""
它从 LangSmith 数据集加载数据为 Document。
content_key 支持嵌套访问
# document_loaders/langsmith.py:99
self.content_key = list(content_key.split(".")) if content_key else []
设置 content_key="input.text" 会按 . 分割为 ["input", "text"],然后在 lazy_load 中逐层访问:
# document_loaders/langsmith.py:126-129
content: Any = example.inputs
for key in self.content_key:
content = content[key] # ← 逐层深入
content_str = self.format_content(content)
metadata 转换
# document_loaders/langsmith.py:130-134
metadata = pydantic_to_dict(example)
for k in ("dataset_id", "created_at", "modified_at", "source_run_id", "id"):
metadata[k] = str(metadata[k]) if metadata[k] else metadata[k]
yield Document(content_str, metadata=metadata)
把 Example 的所有字段都放进 metadata,然后把 UUID/datetime 类型转成字符串——因为 metadata 通常需要 JSON 可序列化。
Lazy Loading 设计模式
整个 Document Loader 体系围绕生成器(lazy loading)设计。对比两种模式:
| 维度 | Eager Loading | Lazy Loading |
|---|---|---|
| 实现 | load() → list[Document] |
lazy_load() → Iterator[Document] |
| 内存 | 所有文档同时在内存 | 逐条产出,内存占用恒定 |
| 首个结果 | 全部加载完才能拿到 | 立即拿到第一个 |
| 适合场景 | 小数据集、需要随机访问 | 大数据集、流式处理 |
| 可组合性 | 需要完整列表 | 可以管道式处理 |
LangChain 的建议很明确:子类应该实现 lazy_load()。load() 只是便捷包装。
document_loaders/__init__.py 的导出
# document_loaders/__init__.py:12-19
__all__ = (
"BaseBlobParser",
"BaseLoader",
"Blob",
"BlobLoader",
"LangSmithLoader",
"PathLike",
)
注意 Blob 和 PathLike 是从 blob_loaders.py 重新导出的(blob_loaders.py:13 从 documents.base 导入了 Blob 和 PathLike),提供向后兼容的导入路径。
小结
- BaseLoader 是所有 Loader 的基类,核心方法是
lazy_load()生成器 load()只是list(lazy_load())的便捷包装,不应该被子类覆盖- 向后兼容:
lazy_load()会检测子类是否只实现了load(),自动 fallback - BaseBlobParser 负责"怎么解析",BlobLoader 负责"从哪里读"——三层架构解耦加载和解析
- LangSmithLoader 是
langchain-core中唯一的具体 Loader 实现 - 生成器模式(lazy loading)是整个体系的设计核心
文档加载出来了,但一个文档可能很长——需要拆分成合适大小的块才能送入向量存储。下一部分看 TextSplitter。
LangChain Documents 深度解析(三):TextSplitter——文本分块的艺术
一个能跑的例子
from langchain_text_splitters import RecursiveCharacterTextSplitter
text = """# LangChain 简介
LangChain 是一个用于构建 LLM 应用的框架。它提供了丰富的工具和抽象。
## 核心概念
Document 是数据的基本单位。每个 Document 包含文本内容和元数据。
TextSplitter 用于将长文本拆分为小块。这对于 RAG 管线至关重要,
因为向量存储通常对文本长度有限制。
## 设计哲学
简单、可组合、可扩展。LangChain 的每个组件都是 Runnable,
可以通过管道操作符连接。"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=100,
chunk_overlap=20,
separators=["\n\n", "\n", " ", ""],
)
chunks = splitter.split_text(text)
for i, chunk in enumerate(chunks):
print(f"--- Chunk {i} ({len(chunk)} chars) ---")
print(chunk)
print()
RecursiveCharacterTextSplitter 会先尝试按 \n\n 分割,如果某块还是太长,就用 \n 分割,再不行用空格,最后逐字符。这是最常用的分块器。
TextSplitter 基类——模板方法模式
# text-splitters/base.py:44-45
class TextSplitter(BaseDocumentTransformer, ABC):
"""Interface for splitting text into chunks."""
继承自 BaseDocumentTransformer(第 1 部分讲过),所以 TextSplitter 也是"文档进 → 文档出"的变换器。
构造参数
# text-splitters/base.py:47-90
def __init__(
self,
chunk_size: int = 4000, # 最大块大小
chunk_overlap: int = 200, # 块间重叠
length_function: Callable[[str], int] = len, # 长度计算函数
keep_separator: bool | Literal["start", "end"] = False, # 保留分隔符
add_start_index: bool = False, # 在 metadata 中记录起始位置
strip_whitespace: bool = True, # 去除首尾空白
) -> None:
每个参数都有校验:
# text-splitters/base.py:73-84
if chunk_size <= 0:
raise ValueError(f"chunk_size must be > 0, got {chunk_size}")
if chunk_overlap < 0:
raise ValueError(f"chunk_overlap must be >= 0, got {chunk_overlap}")
if chunk_overlap > chunk_size:
raise ValueError(f"Got a larger chunk overlap ({chunk_overlap}) "
f"than chunk size ({chunk_size})")
模板方法模式
TextSplitter 的方法调用链非常清晰:
split_text(text) ← 抽象方法,子类必须实现
↑
create_documents(texts) ← 对每个 text 调用 split_text,创建 Document
↑
split_documents(documents) ← 提取 page_content,调用 create_documents
↑
transform_documents(documents) ← BaseDocumentTransformer 接口,调用 split_documents
这是经典的模板方法模式——子类只需要实现 split_text(),上层的 create_documents → split_documents → transform_documents 全部自动工作。
create_documents()——分块 + 组装
# text-splitters/base.py:103-129
def create_documents(
self, texts: list[str], metadatas: list[dict] | None = None
) -> list[Document]:
metadatas_ = metadatas or [{}] * len(texts)
documents = []
for i, text in enumerate(texts):
index = 0
previous_chunk_len = 0
for chunk in self.split_text(text):
metadata = copy.deepcopy(metadatas_[i])
if self._add_start_index:
offset = index + previous_chunk_len - self._chunk_overlap
index = text.find(chunk, max(0, offset))
metadata["start_index"] = index
previous_chunk_len = len(chunk)
new_doc = Document(page_content=chunk, metadata=metadata)
documents.append(new_doc)
return documents
注意 add_start_index 的实现——它用 text.find(chunk, offset) 从上次结束位置附近搜索,避免从头扫描。offset = index + previous_chunk_len - chunk_overlap 考虑了重叠区域。
_merge_splits()——核心合并算法
这是 TextSplitter 最复杂的方法,负责把切分出的小片段合并成合适大小的块:
# text-splitters/base.py:152-194
def _merge_splits(self, splits: Iterable[str], separator: str) -> list[str]:
separator_len = self._length_function(separator)
docs = []
current_doc: list[str] = []
total = 0
for d in splits:
len_ = self._length_function(d)
if (
total + len_ + (separator_len if len(current_doc) > 0 else 0)
> self._chunk_size
):
if len(current_doc) > 0:
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
# 弹出旧片段直到满足重叠约束
while total > self._chunk_overlap or (
total + len_ + (separator_len if len(current_doc) > 0 else 0)
> self._chunk_size
and total > 0
):
total -= self._length_function(current_doc[0]) + (
separator_len if len(current_doc) > 1 else 0
)
current_doc = current_doc[1:]
current_doc.append(d)
total += len_ + (separator_len if len(current_doc) > 1 else 0)
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
return docs
用 ASCII 图解释合并过程(chunk_size=10, chunk_overlap=3):
输入 splits: ["AA", "BB", "CC", "DD", "EE", "FF"]
separator: " "
步骤 1: current_doc=["AA"], total=2
步骤 2: current_doc=["AA","BB"], total=5 (2+1+2)
步骤 3: current_doc=["AA","BB","CC"], total=8 (5+1+2)
步骤 4: "DD" 加入后 total=8+1+2=11 > 10 → 超限!
→ 输出 chunk: "AA BB CC"
→ 弹出 "AA": total=8-2-1=5, current_doc=["BB","CC"]
→ 弹出 "BB": total=5-2-1=2, current_doc=["CC"]
→ total=2 ≤ 3(overlap), 停止弹出
→ current_doc=["CC","DD"], total=2+1+2=5
结果: ["AA BB CC", "CC DD EE", "EE FF"]
^^ ^^
重叠区域 重叠区域
工厂方法
# text-splitters/base.py:225-281
@classmethod
def from_tiktoken_encoder(cls, encoding_name="gpt2", model_name=None,
allowed_special=set(), disallowed_special="all",
**kwargs) -> Self:
"""Text splitter that uses tiktoken encoder to count length."""
# 用 tiktoken 的 token 数替代字符数作为 length_function
def _tiktoken_encoder(text: str) -> int:
return len(enc.encode(text, ...))
return cls(length_function=_tiktoken_encoder, **kwargs)
# text-splitters/base.py:196-223
@classmethod
def from_huggingface_tokenizer(cls, tokenizer, **kwargs) -> TextSplitter:
"""Text splitter that uses HuggingFace tokenizer to count length."""
def _huggingface_tokenizer_length(text: str) -> int:
return len(tokenizer.tokenize(text))
return cls(length_function=_huggingface_tokenizer_length, **kwargs)
关键洞察:这两个工厂方法只是替换了 length_function。切分逻辑不变,但长度计算从字符数变成了 token 数——这对于控制 LLM 输入长度非常重要。
CharacterTextSplitter——单分隔符切分
# character.py:11-58
class CharacterTextSplitter(TextSplitter):
def __init__(self, separator="\n\n", is_separator_regex=False, **kwargs):
super().__init__(**kwargs)
self._separator = separator
self._is_separator_regex = is_separator_regex
def split_text(self, text: str) -> list[str]:
sep_pattern = (
self._separator if self._is_separator_regex
else re.escape(self._separator)
)
splits = _split_text_with_regex(
text, sep_pattern, keep_separator=self._keep_separator
)
merge_sep = ""
if not (self._keep_separator or is_lookaround):
merge_sep = self._separator
return self._merge_splits(splits, merge_sep)
最简单的分块器——用单个分隔符切分,然后合并。
_split_text_with_regex() 的三种模式
# character.py:61-85
def _split_text_with_regex(text, separator, *, keep_separator):
if separator:
if keep_separator:
splits_ = re.split(f"({separator})", text) # ← 捕获组保留分隔符
if keep_separator == "end":
# ["text1", "\n", "text2"] → ["text1\n", "text2"]
splits = [splits_[i] + splits_[i+1] for i in range(0, len(splits_)-1, 2)]
else: # "start" 或 True
# ["text1", "\n", "text2"] → ["text1", "\ntext2"]
splits = [splits_[i] + splits_[i+1] for i in range(1, len(splits_), 2)]
else:
splits = re.split(separator, text) # ← 丢弃分隔符
else:
splits = list(text) # ← 逐字符
return [s for s in splits if s]
keep_separator 的三种取值:
False:丢弃分隔符"start"或True:分隔符附加到下一段的开头"end":分隔符附加到上一段的结尾
RecursiveCharacterTextSplitter——最核心的分块器
# character.py:88-158
class RecursiveCharacterTextSplitter(TextSplitter):
def __init__(self, separators=None, keep_separator=True,
is_separator_regex=False, **kwargs):
super().__init__(keep_separator=keep_separator, **kwargs)
self._separators = separators or ["\n\n", "\n", " ", ""]
self._is_separator_regex = is_separator_regex
默认分隔符 ["\n\n", "\n", " ", ""]——从粗到细,段落 → 行 → 词 → 字符。
_split_text() 递归算法
# character.py:107-147
def _split_text(self, text: str, separators: list[str]) -> list[str]:
final_chunks = []
# 1. 找到第一个在文本中存在的分隔符
separator = separators[-1]
new_separators = []
for i, s_ in enumerate(separators):
if not s_:
separator = s_
break
if re.search(separator_, text):
separator = s_
new_separators = separators[i + 1:] # ← 剩余的更细分隔符
break
# 2. 用该分隔符切分
splits = _split_text_with_regex(text, separator_, keep_separator=...)
# 3. 对每个切分结果:
good_splits = []
for s in splits:
if self._length_function(s) < self._chunk_size:
good_splits.append(s) # ← 足够小,收集起来
else:
if good_splits:
merged = self._merge_splits(good_splits, separator_)
final_chunks.extend(merged)
good_splits = []
if not new_separators:
final_chunks.append(s) # ← 没有更细的分隔符了
else:
other = self._split_text(s, new_separators) # ← 递归!
final_chunks.extend(other)
if good_splits:
merged = self._merge_splits(good_splits, separator_)
final_chunks.extend(merged)
return final_chunks
用递归树来可视化(chunk_size=50):
输入:一段 200 字的文本
┌────────────────────────────────────────────┐
│ _split_text(text, ["\n\n", "\n", " ", ""]) │
└────────────────────┬───────────────────────┘
│ 用 "\n\n" 切分
┌──────────┼──────────┐
▼ ▼ ▼
段落A(30) 段落B(120) 段落C(40)
✓ < 50 ✗ > 50 ✓ < 50
│ │ │
│ ▼ │
│ 递归: _split_text(段落B, ["\n", " ", ""])
│ │
│ 用 "\n" 切分
│ ┌──────┼──────┐
│ ▼ ▼ ▼
│ 行1(45) 行2(35) 行3(40)
│ ✓ ✓ ✓
│ │ │ │
▼ ▼ ▼ ▼
合并后输出:["段落A", "行1", "行2 行3", "段落C"]
核心思想:先用粗粒度分隔符切分,如果某块还是太大,就用更细粒度的分隔符递归切分。这保证了切分点尽量落在语义边界上。
from_language() 工厂方法
# character.py:160-176
@classmethod
def from_language(cls, language: Language, **kwargs):
separators = cls.get_separators_for_language(language)
return cls(separators=separators, is_separator_regex=True, **kwargs)
Language 枚举支持 27 种编程语言(base.py:372-403)。每种语言有定制的分隔符列表:
| 语言 | 分隔符示例(前几个) |
|---|---|
| Python | "\nclass ", "\ndef ", "\n\tdef ", "\n\n" |
| JavaScript | "\nfunction ", "\nconst ", "\nlet ", "\nclass " |
| Markdown | "\n#{1,6} ", "```\n", "\n\n" |
| Go | "\nfunc ", "\nvar ", "\nconst ", "\ntype " |
| Rust | "\nfn ", "\nconst ", "\nlet ", "\nif " |
| Java | "\nclass ", "\npublic ", "\nprotected ", "\nprivate " |
Python 的分隔符(character.py:363-374):
if language == Language.PYTHON:
return [
"\nclass ", # 先按类定义切分
"\ndef ", # 再按顶层函数切分
"\n\tdef ", # 再按缩进方法切分
"\n\n", # 再按空行
"\n", # 再按行
" ", # 再按空格
"", # 最后逐字符
]
TokenTextSplitter——基于 token 的切分
# base.py:298-369
class TokenTextSplitter(TextSplitter):
def __init__(self, encoding_name="gpt2", model_name=None, ...):
super().__init__(**kwargs)
if model_name is not None:
enc = tiktoken.encoding_for_model(model_name)
else:
enc = tiktoken.get_encoding(encoding_name)
self._tokenizer = enc
def split_text(self, text: str) -> list[str]:
tokenizer = Tokenizer(
chunk_overlap=self._chunk_overlap,
tokens_per_chunk=self._chunk_size,
decode=self._tokenizer.decode,
encode=_encode,
)
return split_text_on_tokens(text=text, tokenizer=tokenizer)
核心逻辑在 split_text_on_tokens()(base.py:422-450):
# base.py:422-450
def split_text_on_tokens(*, text: str, tokenizer: Tokenizer) -> list[str]:
splits = []
input_ids = tokenizer.encode(text) # 全部编码为 token ID
start_idx = 0
while start_idx < len(input_ids):
cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
chunk_ids = input_ids[start_idx:cur_idx]
decoded = tokenizer.decode(chunk_ids) # 解码回文本
if decoded:
splits.append(decoded)
if cur_idx == len(input_ids):
break
start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
return splits
和 RecursiveCharacterTextSplitter.from_tiktoken_encoder() 的区别:
| 维度 | TokenTextSplitter |
from_tiktoken_encoder() |
|---|---|---|
| 切分单位 | token | 字符(但用 token 计数) |
| 切分方式 | 直接按 token 边界切 | 按分隔符切,合并时用 token 计数 |
| 精度 | token 级精确 | 可能偏差几个 token |
| 语义保留 | 可能在词中间切断 | 尽量在分隔符处切分 |
Header-Based Splitters——按标题切分
这两个 Splitter 不继承 TextSplitter,因为它们返回的是 Document(带 metadata)而不是 str。
MarkdownHeaderTextSplitter
# markdown.py:23-280
class MarkdownHeaderTextSplitter:
def __init__(self, headers_to_split_on: list[tuple[str, str]],
return_each_line=False, strip_headers=True):
self.headers_to_split_on = sorted(
headers_to_split_on, key=lambda split: len(split[0]), reverse=True
)
from langchain_text_splitters import MarkdownHeaderTextSplitter
md_text = """# 第一章
第一章的内容。
## 第一节
第一节的内容。
## 第二节
第二节的内容。
# 第二章
第二章的内容。"""
splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[("#", "H1"), ("##", "H2")]
)
docs = splitter.split_text(md_text)
for doc in docs:
print(f"metadata={doc.metadata}, content={doc.page_content[:30]}")
输出:
metadata={'H1': '第一章'}, content=第一章的内容。
metadata={'H1': '第一章', 'H2': '第一节'}, content=第一节的内容。
metadata={'H1': '第一章', 'H2': '第二节'}, content=第二节的内容。
metadata={'H1': '第二章'}, content=第二章的内容。
关键特性:metadata 中自动记录标题层级。当遇到新标题时,弹出栈中同级或更低级的标题(markdown.py:204-214),保证 metadata 反映当前的层级上下文。
HTMLHeaderTextSplitter
# html.py:83-345
class HTMLHeaderTextSplitter:
def __init__(self, headers_to_split_on, return_each_element=False):
self.headers_to_split_on = sorted(
headers_to_split_on, key=lambda x: int(x[0][1:])
)
逻辑类似 MarkdownHeaderTextSplitter,但基于 BeautifulSoup 解析 HTML DOM 树。它使用 DFS 遍历(html.py:276),对每个节点判断是否是目标标题标签。
RecursiveJsonSplitter——保持 JSON 结构
# json.py:12-191
class RecursiveJsonSplitter:
"""Splits JSON data into smaller, structured chunks
while preserving hierarchy."""
def __init__(self, max_chunk_size=2000, min_chunk_size=None):
self.max_chunk_size = max_chunk_size
self.min_chunk_size = (
min_chunk_size if min_chunk_size is not None
else max(max_chunk_size - 200, 50)
)
不继承 TextSplitter——它操作的是 dict 而非 str。
from langchain_text_splitters import RecursiveJsonSplitter
data = {
"name": "LangChain",
"version": "0.3",
"components": {
"core": "基础抽象",
"loaders": "数据加载",
"splitters": "文本分块",
"stores": "向量存储"
},
"features": ["RAG", "Agent", "Streaming"]
}
splitter = RecursiveJsonSplitter(max_chunk_size=100)
chunks = splitter.split_json(data)
for i, chunk in enumerate(chunks):
print(f"Chunk {i}: {chunk}")
核心算法 _json_split()(json.py:85-114)递归遍历 JSON 树,当当前块超过 max_chunk_size 且已达到 min_chunk_size 时,开启新块。_set_nested_dict() 保证了切分后每个块仍然保持原有的嵌套结构。
14 个内置 Splitter 对比表
| Splitter | 继承 | 切分方式 | 返回类型 | 典型场景 |
|---|---|---|---|---|
CharacterTextSplitter |
TextSplitter |
单分隔符 | list[str] |
简单文本 |
RecursiveCharacterTextSplitter |
TextSplitter |
多分隔符递归 | list[str] |
通用首选 |
TokenTextSplitter |
TextSplitter |
tiktoken token | list[str] |
精确控制 token 数 |
MarkdownTextSplitter |
Recursive... |
Markdown 分隔符 | list[str] |
Markdown 文本 |
MarkdownHeaderTextSplitter |
无 | Markdown 标题 | list[Document] |
按标题分章 |
ExperimentalMarkdownSyntaxTextSplitter |
无 | Markdown 语法 | list[Document] |
保留原始格式 |
HTMLHeaderTextSplitter |
无 | HTML 标题标签 | list[Document] |
HTML 按标题分段 |
HTMLSectionSplitter |
无 | HTML 标题 + XSLT | list[Document] |
HTML 段落 |
HTMLSemanticPreservingSplitter |
BaseDoc... |
HTML 语义 | list[Document] |
保留 HTML 结构 |
RecursiveJsonSplitter |
无 | JSON 结构递归 | list[dict] |
JSON 数据 |
LatexTextSplitter |
Recursive... |
LaTeX 分隔符 | list[str] |
LaTeX 文档 |
PythonCodeTextSplitter |
Recursive... |
Python 分隔符 | list[str] |
Python 代码 |
NLTKTextSplitter |
TextSplitter |
NLTK 句子检测 | list[str] |
按句子切分 |
SpacyTextSplitter |
TextSplitter |
spaCy 句子检测 | list[str] |
按句子切分 |
选择建议:
- 通用文本:
RecursiveCharacterTextSplitter(默认选择) - 代码:
RecursiveCharacterTextSplitter.from_language(Language.PYTHON) - Markdown:
MarkdownHeaderTextSplitter(需要标题元数据时)或MarkdownTextSplitter - HTML:
HTMLHeaderTextSplitter或HTMLSemanticPreservingSplitter - JSON:
RecursiveJsonSplitter - 精确 token 控制:
TokenTextSplitter
小结
- TextSplitter 是分块器基类,用模板方法模式:子类实现
split_text(),上层自动工作 _merge_splits()是核心合并算法,处理 chunk_size 和 chunk_overlapRecursiveCharacterTextSplitter是最常用的——多分隔符从粗到细递归切分from_language()支持 27 种编程语言的定制分隔符- Header-Based Splitters 不继承 TextSplitter,按标题切分并在 metadata 中记录层级
RecursiveJsonSplitter保持 JSON 结构的递归切分
八个系列总览
| 系列 | 核心模块 | 关键类 | 一句话 |
|---|---|---|---|
| Messages | messages/ |
BaseMessage, AIMessage, HumanMessage |
LLM 对话的数据结构 |
| Runnables | runnables/ |
Runnable, RunnableSequence, RunnableLambda |
LCEL 管道的基础抽象 |
| ChatModel | language_models/ |
BaseChatModel, generate, stream |
LLM 调用的统一接口 |
| Outputs | outputs/, output_parsers/ |
ChatResult, BaseOutputParser |
模型输出的结构化处理 |
| Prompts | prompts/ |
ChatPromptTemplate, MessagesPlaceholder |
提示词模板与变量注入 |
| Tools | tools/ |
BaseTool, @tool, StructuredTool |
函数调用与工具系统 |
| Tracers | tracers/ |
BaseTracer, LangChainTracer |
可观测性与调试追踪 |
| Documents | documents/, document_loaders/, text-splitters/ |
Document, BaseLoader, TextSplitter |
RAG 管线的数据基础 |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)