【LangChain 源码解析九:Indexing】
本系列共 2 部分,从记录管理到索引 API,完整拆解 LangChain Indexing 模块的去重与清理机制。
- 第 1 部分:RecordManager——索引的记忆(本文)
- 第 2 部分:index() API——去重、更新与清理的完整策略
LangChain Indexing 深度解析(一):RecordManager——索引的记忆
一个能跑的例子
from langchain_core.indexing import InMemoryRecordManager
import time
# ---- 创建记录管理器 ----
rm = InMemoryRecordManager(namespace="my_index")
rm.create_schema()
# ---- 记录文档 ----
rm.update(["doc-hash-1", "doc-hash-2"], group_ids=["source-a", "source-a"])
print(f"记录数: {len(rm.records)}") # 记录数: 2
# ---- 检查存在性 ----
print(rm.exists(["doc-hash-1", "doc-hash-3"])) # [True, False]
# ---- 按条件列出 ----
time.sleep(0.01)
rm.update(["doc-hash-3"], group_ids=["source-b"])
now = time.time()
old_keys = rm.list_keys(before=now, group_ids=["source-a"])
print(f"source-a 的旧文档: {old_keys}") # ['doc-hash-1', 'doc-hash-2']
# ---- 删除 ----
rm.delete_keys(["doc-hash-1"])
print(rm.exists(["doc-hash-1"])) # [False]
当你往 VectorStore 里反复索引文档时,如何避免重复?如何删除已过时的文档?RecordManager 就是解决这些问题的元数据层——它记住"哪些文档已经索引了"、“什么时候索引的”、“来自哪个源”。
为什么需要 Indexing 模块?
直接用 VectorStore.add_documents() 有三个问题:
问题 1:重复写入
──────────────
同一份文档反复索引 → VectorStore 中出现多份副本
→ 检索时返回重复结果
问题 2:无法检测变更
──────────────────
文档内容更新后再次索引 → 旧版本和新版本同时存在
→ 检索到过时内容
问题 3:无法清理
──────────────
源数据中删除了某些文档 → VectorStore 中对应文档仍然存在
→ 检索到"幽灵"文档
Indexing 模块通过内容哈希 + 时间戳记录解决这三个问题:
┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Documents │ │ RecordManager │ │ VectorStore │
│ (数据源) │ ──→ │ (谁被索引了?) │ ──→ │ (向量存储) │
└──────────────┘ │ key = hash(doc) │ └──────────────┘
│ updated_at │
│ group_id │
└─────────────────┘
RecordManager ABC——12 个抽象方法
打开 indexing/base.py:22:
# indexing/base.py:22-55
class RecordManager(ABC):
"""Abstract base class representing the interface for a record manager.
The record manager keeps track of which documents have been
written into a VectorStore and when they were written.
The indexing API computes hashes for each document and stores the hash
together with the write time and the source id in the record manager.
"""
def __init__(self, namespace: str) -> None:
self.namespace = namespace
namespace 把不同的索引空间隔离开——同一个 RecordManager 后端可以管理多个独立的索引。
方法总览
RecordManager 定义了 6 对同步/异步方法(12 个抽象方法):
| 方法 | 签名 | 说明 |
|---|---|---|
create_schema / acreate_schema |
() → None |
创建存储 schema(如数据库建表) |
get_time / aget_time |
() → float |
获取服务器时间(必须单调递增) |
update / aupdate |
(keys, group_ids?, time_at_least?) → None |
更新/插入记录 |
exists / aexists |
(keys) → list[bool] |
批量检查 key 是否存在 |
list_keys / alist_keys |
(before?, after?, group_ids?, limit?) → list[str] |
按条件筛选 key |
delete_keys / adelete_keys |
(keys) → None |
删除记录 |
每条记录存什么?
RecordManager 中每条记录包含:
┌──────────────────────────────────────────┐
│ key │ 文档的内容哈希(唯一 ID) │
│ group_id │ 来源标识(source_id) │
│ updated_at │ 最后更新时间戳 │
└──────────────────────────────────────────┘
get_time——为什么必须是服务器时间?
# indexing/base.py:76-85
@abstractmethod
def get_time(self) -> float:
"""Get the current server time as a high resolution timestamp!
It's important to get this from the server to ensure a monotonic clock,
otherwise there may be data loss when cleaning up old documents!
"""
这是一个关键的设计约束。cleanup 模式依赖 before 时间戳来识别"旧文档"——如果客户端时钟不一致(时钟漂移),可能错误地删除新文档。因此:
- 生产实现(如 SQLRecordManager)从数据库服务器获取时间
- InMemoryRecordManager 用
time.time()因为只在单进程内使用
update——time_at_least 的防漂移机制
# indexing/base.py:98-124
@abstractmethod
def update(
self,
keys: Sequence[str],
*,
group_ids: Sequence[str | None] | None = None,
time_at_least: float | None = None,
) -> None:
"""Upsert records into the database.
Args:
time_at_least: Optional timestamp. Implementation can use this
to optionally verify that the timestamp IS at least this time
in the system that stores the data.
e.g., use to validate that the time in the postgres database
is equal to or larger than the given timestamp, if not raise an error.
This is meant to help prevent time-drift issues since
time may not be monotonically increasing!
"""
time_at_least 是一个安全阀——调用方说"服务器时间应该不小于这个值",如果实际小于,说明发生了时钟回拨,应该报错而不是默默产生错误数据。
list_keys——cleanup 的核心查询
# indexing/base.py:176-195
@abstractmethod
def list_keys(
self,
*,
before: float | None = None, # 更新时间 < before
after: float | None = None, # 更新时间 > after
group_ids: Sequence[str] | None = None, # 按来源过滤
limit: int | None = None, # 分页限制
) -> list[str]:
这个方法是 cleanup 策略的基础——“列出在某个时间点之前、属于某些来源的所有 key”。
InMemoryRecordManager——字典实现
打开 indexing/base.py:235:
# indexing/base.py:235-252
class _Record(TypedDict):
group_id: str | None
updated_at: float
class InMemoryRecordManager(RecordManager):
"""An in-memory record manager for testing purposes."""
def __init__(self, namespace: str) -> None:
super().__init__(namespace)
self.records: dict[str, _Record] = {}
每条记录就是一个 {group_id, updated_at} 字典。
update 实现
# indexing/base.py:269-304
def update(self, keys, *, group_ids=None, time_at_least=None):
if group_ids and len(keys) != len(group_ids):
raise ValueError("Length of keys must match length of group_ids")
for index, key in enumerate(keys):
group_id = group_ids[index] if group_ids else None
if time_at_least and time_at_least > self.get_time():
raise ValueError("time_at_least must be in the past")
self.records[key] = {
"group_id": group_id,
"updated_at": self.get_time()
}
注意:每个 key 的 updated_at 都用 self.get_time() 获取,确保时间戳反映实际写入时间。
list_keys 实现——多条件过滤
# indexing/base.py:352-386
def list_keys(self, *, before=None, after=None, group_ids=None, limit=None):
result = []
for key, data in self.records.items():
if before and data["updated_at"] >= before:
continue
if after and data["updated_at"] <= after:
continue
if group_ids and data["group_id"] not in group_ids:
continue
result.append(key)
if limit:
return result[:limit]
return result
三个过滤条件的逻辑关系是 AND——必须同时满足所有条件。
异步方法——直接委托同步
# InMemoryRecordManager 的所有异步方法都是直接调用同步版本
async def aupdate(self, keys, *, group_ids=None, time_at_least=None):
self.update(keys, group_ids=group_ids, time_at_least=time_at_least)
async def aexists(self, keys):
return self.exists(keys)
因为内存操作不涉及 I/O,所以不需要 run_in_executor。生产实现(如 SQLRecordManager)会使用真正的异步数据库驱动。
文档哈希——内容指纹
打开 indexing/api.py:38-49:
# indexing/api.py:36-49
# Magic UUID to use as a namespace for hashing.
NAMESPACE_UUID = uuid.UUID(int=1984)
def _hash_string_to_uuid(input_string: str) -> str:
"""Hashes a string and returns the corresponding UUID."""
hash_value = hashlib.sha1(
input_string.encode("utf-8"), usedforsecurity=False
).hexdigest()
return str(uuid.uuid5(NAMESPACE_UUID, hash_value))
多算法支持
# indexing/api.py:151-166
def _calculate_hash(text, algorithm):
"""Return a hexadecimal digest of text using algorithm."""
if algorithm == "sha1":
digest = hashlib.sha1(text.encode("utf-8"), usedforsecurity=False).hexdigest()
return str(uuid.uuid5(NAMESPACE_UUID, digest))
if algorithm == "blake2b":
return hashlib.blake2b(text.encode("utf-8")).hexdigest()
if algorithm == "sha256":
return hashlib.sha256(text.encode("utf-8")).hexdigest()
if algorithm == "sha512":
return hashlib.sha512(text.encode("utf-8")).hexdigest()
支持 4 种哈希算法:
| 算法 | 安全性 | 速度 | 说明 |
|---|---|---|---|
sha1 |
弱(不抗碰撞) | 快 | 默认值,有安全警告 |
sha256 |
强 | 中 | 推荐用于新项目 |
sha512 |
强 | 中 | 更长的摘要 |
blake2b |
强 | 最快 | 现代哈希,推荐 |
文档指纹生成
# indexing/api.py:169-224
def _get_document_with_hash(document, *, key_encoder):
metadata = dict(document.metadata or {})
if callable(key_encoder):
# 自定义编码器
hash_ = key_encoder(document)
else:
# 内置算法:分别哈希内容和元数据,再组合
content_hash = _calculate_hash(document.page_content, algorithm=key_encoder)
serialized_meta = json.dumps(metadata, sort_keys=True)
metadata_hash = _calculate_hash(serialized_meta, algorithm=key_encoder)
hash_ = _calculate_hash(content_hash + metadata_hash, algorithm=key_encoder)
return Document(
id=hash_, # ← 哈希值成为文档 ID
page_content=document.page_content,
metadata=document.metadata,
)
指纹生成流程:
Document(page_content="hello", metadata={"source": "a.txt"})
│
├─→ content_hash = hash("hello")
│
├─→ metadata_hash = hash('{"source": "a.txt"}') ← JSON 序列化, sort_keys
│
└─→ final_hash = hash(content_hash + metadata_hash)
│
↓
Document(id=final_hash, page_content="hello", metadata={"source": "a.txt"})
关键设计决策:
- 内容和元数据都参与哈希——修改 metadata 也算"变更"
- metadata 用
json.dumps(sort_keys=True)确保字典顺序不影响哈希 - 支持自定义
key_encoder函数,完全控制指纹生成逻辑
批内去重
# indexing/api.py:133-144
def _deduplicate_in_order(hashed_documents):
"""Deduplicate a list of hashed documents while preserving order."""
seen: set[str] = set()
for hashed_doc in hashed_documents:
if hashed_doc.id not in seen:
seen.add(cast("str", hashed_doc.id))
yield hashed_doc
如果同一批中有两个内容完全相同的文档(哈希相同),只保留第一个。
UpsertResponse 与 DeleteResponse
# indexing/base.py:434-457
class UpsertResponse(TypedDict):
succeeded: list[str] # 成功索引的 ID
failed: list[str] # 失败的 ID
# indexing/base.py:460-493
class DeleteResponse(TypedDict, total=False): # total=False → 所有字段可选
num_deleted: int # 实际删除数
succeeded: Sequence[str] # 成功删除的 ID
failed: Sequence[str] # 失败的 ID
num_failed: int # 失败数
注意 DeleteResponse 用 total=False——所有字段都是可选的,具体实现可以只返回部分信息。
小结
| 要点 | 内容 |
|---|---|
| 文件 | indexing/base.py(662 行) |
| 核心类 | RecordManager(ABC) |
| 抽象方法 | 6 对同步/异步 = 12 个 |
| 每条记录 | {key, group_id, updated_at} |
| 时间约束 | 服务器时间,单调递增 |
| 哈希算法 | sha1(默认)/ sha256 / sha512 / blake2b / 自定义 |
| 指纹范围 | page_content + metadata 共同参与 |
| 内存实现 | InMemoryRecordManager——字典存储 |
下一篇问题:有了 RecordManager 记住"谁被索引了",index() 函数是如何编排整个流程的?三种 cleanup 模式(incremental / full / scoped_full)有什么区别?DocumentIndex 又是什么新抽象?
LangChain Indexing 深度解析(二):index() API——去重、更新与清理的完整策略
一个能跑的例子
import warnings
warnings.filterwarnings("ignore") # 忽略 SHA-1 警告
from langchain_core.documents import Document
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.indexing import InMemoryRecordManager, index
# ---- 准备基础设施 ----
embed = DeterministicFakeEmbedding(size=50)
store = InMemoryVectorStore(embedding=embed)
rm = InMemoryRecordManager(namespace="test")
rm.create_schema()
# ---- 第一次索引 ----
docs_v1 = [
Document(page_content="Python 是解释型语言", metadata={"source": "wiki"}),
Document(page_content="Java 是编译型语言", metadata={"source": "wiki"}),
]
result1 = index(docs_v1, rm, store, cleanup="full", source_id_key="source")
print(f"第一次: {result1}")
# {'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
# ---- 第二次索引(相同内容)→ 全部跳过 ----
result2 = index(docs_v1, rm, store, cleanup="full", source_id_key="source")
print(f"第二次: {result2}")
# {'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
# ---- 第三次索引(内容变更 + 删除一条)→ 更新 + 清理 ----
docs_v2 = [
Document(page_content="Python 是动态类型的解释型语言", metadata={"source": "wiki"}),
]
result3 = index(docs_v2, rm, store, cleanup="full", source_id_key="source")
print(f"第三次: {result3}")
# {'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 2}
index() 是 Indexing 模块的核心 API——一个函数解决去重、更新检测、过时清理三大问题。
index() 函数签名
打开 indexing/api.py:290:
# indexing/api.py:290-303
def index(
docs_source: BaseLoader | Iterable[Document],
record_manager: RecordManager,
vector_store: VectorStore | DocumentIndex,
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", "scoped_full"] | None = None,
source_id_key: str | Callable[[Document], str] | None = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
key_encoder: Literal["sha1", "sha256", "sha512", "blake2b"]
| Callable[[Document], str] = "sha1",
upsert_kwargs: dict[str, Any] | None = None,
) -> IndexingResult:
| 参数 | 类型 | 说明 |
|---|---|---|
docs_source |
BaseLoader | Iterable[Document] |
文档来源 |
record_manager |
RecordManager |
记录管理器 |
vector_store |
VectorStore | DocumentIndex |
写入目标 |
batch_size |
int |
每批处理文档数(默认 100) |
cleanup |
"incremental" | "full" | "scoped_full" | None |
清理模式 |
source_id_key |
str | Callable | None |
提取来源 ID 的 key 或函数 |
force_update |
bool |
强制更新(即使哈希未变) |
key_encoder |
str | Callable |
哈希算法 |
upsert_kwargs |
dict | None |
传给 VectorStore 的额外参数 |
返回值 IndexingResult:
# indexing/api.py:277-287
class IndexingResult(TypedDict):
num_added: int # 新增文档数
num_updated: int # 更新文档数(force_update 时)
num_deleted: int # 清理删除数
num_skipped: int # 跳过(未变更)数
索引流程详解
阶段 0:预检查
# indexing/api.py:400-444
# 1. 验证 cleanup 参数合法性
if cleanup not in {"incremental", "full", "scoped_full", None}:
raise ValueError(...)
# 2. incremental/scoped_full 必须指定 source_id_key
if (cleanup in {"incremental", "scoped_full"}) and source_id_key is None:
raise ValueError(...)
# 3. 检查 VectorStore 实现了 delete 方法
if type(destination).delete == VectorStore.delete:
raise ValueError("Vectorstore has not implemented the delete method")
阶段 1:迭代文档源
# indexing/api.py:446-452
if isinstance(docs_source, BaseLoader):
try:
doc_iterator = docs_source.lazy_load() # 优先用 lazy_load
except NotImplementedError:
doc_iterator = iter(docs_source.load()) # 降级到 load
else:
doc_iterator = iter(docs_source)
阶段 2:批量处理(核心循环)
# indexing/api.py:457-576(简化)
index_start_dt = record_manager.get_time() # ← 记录开始时间
for doc_batch in _batch(batch_size, doc_iterator):
# Step 2a: 计算哈希 + 批内去重
hashed_docs = list(_deduplicate_in_order(
[_get_document_with_hash(doc, key_encoder=key_encoder) for doc in doc_batch]
))
# Step 2b: 提取 source_id
source_ids = [source_id_assigner(doc) for doc in hashed_docs]
# Step 2c: 查询哪些 key 已存在
exists_batch = record_manager.exists([doc.id for doc in hashed_docs])
# Step 2d: 分流——新文档 vs 已存在文档
docs_to_index = [] # 需要写入 VectorStore
uids_to_refresh = [] # 只需刷新时间戳
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
if force_update:
docs_to_index.append(hashed_doc) # 强制更新
else:
uids_to_refresh.append(hashed_doc.id) # 只刷新时间戳
num_skipped += 1
else:
docs_to_index.append(hashed_doc) # 新文档
# Step 2e: 先写 VectorStore(悲观策略)
if docs_to_index:
vector_store.add_documents(docs_to_index, ids=uids, ...)
# Step 2f: 再更新 RecordManager
record_manager.update([doc.id for doc in hashed_docs],
group_ids=source_ids,
time_at_least=index_start_dt)
# Step 2g: incremental cleanup(每批执行)
if cleanup == "incremental":
uids_to_delete = record_manager.list_keys(
group_ids=source_ids_, before=index_start_dt)
_delete(vector_store, uids_to_delete)
record_manager.delete_keys(uids_to_delete)
ASCII 流程图:
每批文档的处理流程
════════════════════════════════════════════════════════
doc_batch: [Doc1, Doc2, Doc3, Doc3_dup]
│
↓ _get_document_with_hash → 计算指纹
hashed: [Doc1#aaa, Doc2#bbb, Doc3#ccc, Doc3_dup#ccc]
│
↓ _deduplicate_in_order → 批内去重
deduped: [Doc1#aaa, Doc2#bbb, Doc3#ccc]
│
↓ record_manager.exists([aaa, bbb, ccc])
exists: [True, False, False]
│
├── aaa 已存在 → uids_to_refresh (skip)
├── bbb 不存在 → docs_to_index (add)
└── ccc 不存在 → docs_to_index (add)
│
↓ vector_store.add_documents([Doc2, Doc3])
↓ record_manager.update([aaa, bbb, ccc])
│
↓ cleanup == "incremental"?
├── Yes → 删除 source_ids 下 index_start_dt 前的旧记录
└── No → 继续下一批
阶段 3:全量清理
# indexing/api.py:577-590
if cleanup == "full" or (cleanup == "scoped_full" and scoped_full_cleanup_source_ids):
delete_group_ids = list(scoped_full_cleanup_source_ids) if cleanup == "scoped_full" else None
while uids_to_delete := record_manager.list_keys(
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
_delete(vector_store, uids_to_delete)
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
全量清理在所有文档处理完毕后执行,删除 index_start_dt 之前的所有未刷新记录。
写入顺序——悲观策略
# indexing/api.py:525-551
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
destination.add_documents(docs_to_index, ids=uids, ...)
# And only then update the record store.
record_manager.update([doc.id for doc in hashed_docs], ...)
先写 VectorStore,后写 RecordManager:
场景 1:VectorStore 写入失败
→ RecordManager 未更新 → 下次重试会再次尝试写入 ✓
场景 2:VectorStore 成功,RecordManager 失败
→ 下次重试时会重新写入(幂等性依赖 add_documents 的 upsert 语义)
→ 可能存在短暂不一致,但不会丢数据 ✓
场景 3:都成功 → 一致 ✓
如果颠倒顺序(先写 RecordManager),VectorStore 失败后,RecordManager 认为"已索引",文档永远不会被写入。
三种 Cleanup 模式对比
None——不清理
只增不删。适合首次批量导入或不需要清理的场景。
incremental——增量清理
每批处理后立即清理同一 source_id 下的旧文档。
要求:必须指定 source_id_key
适用场景:
- 文档来源可以按 source 分组
- 需要最小化重复内容可见窗口
- 文档流式处理(不需要一次加载全部)
限制:
- 同一 source_id 出现在多个 batch 中会产生冗余操作
full——全量清理
所有文档处理完毕后,删除 index_start_dt 之前的所有旧记录。
要求:docs_source 必须返回完整数据集
适用场景:
- 数据源较小,可以一次全部加载
- 需要彻底清理所有过时文档
限制:
- 如果 docs_source 只返回部分数据,会错误删除其余文档
- 索引过程中用户可能看到重复内容(清理在最后执行)
scoped_full——限定范围的全量清理
所有文档处理完毕后,只清理本次运行中"见过的 source_id"下的旧记录。
要求:必须指定 source_id_key
适用场景:
- 数据源不能一次返回全部数据
- 需要按来源分批处理
- 10M+ 文档的大规模场景
特点:
- 兼具 incremental 的灵活性和 full 的彻底性
- 在内存中跟踪 source_id 集合(正常规模下无压力)
对比表
| 特性 | None | incremental | full | scoped_full |
|---|---|---|---|---|
| 需要 source_id_key | 否 | 是 | 否 | 是 |
| 清理时机 | 不清理 | 每批后 | 全部完成后 | 全部完成后 |
| 需要完整数据集 | 否 | 否 | 是 | 否 |
| 重复可见窗口 | 永久 | 最小 | 索引期间 | 索引期间 |
| 清理范围 | - | 当前 batch 的 source_ids | 全部 | 见过的 source_ids |
source_id_key——文档分组的关键
# indexing/api.py:116-130
def _get_source_id_assigner(source_id_key):
if source_id_key is None:
return lambda _doc: None
if isinstance(source_id_key, str):
return lambda doc: doc.metadata[source_id_key] # ← 从 metadata 取值
if callable(source_id_key):
return source_id_key # ← 自定义函数
source_id_key 的作用是把文档分成组。同一组的文档共享同一个 group_id:
Document(page_content="第1章...", metadata={"source": "book.pdf", "page": 1})
Document(page_content="第2章...", metadata={"source": "book.pdf", "page": 2})
Document(page_content="介绍...", metadata={"source": "intro.md"})
source_id_key="source" →
group "book.pdf": [第1章, 第2章]
group "intro.md": [介绍]
incremental cleanup 时:
如果 book.pdf 重新索引产生新的 chunks,
只清理 group_id="book.pdf" 下的旧 chunks,
不影响 intro.md 的文档。
DocumentIndex——更新的抽象
打开 indexing/base.py:496:
# indexing/base.py:496-511
@beta(message="Added in 0.2.29. The abstraction is subject to change.")
class DocumentIndex(BaseRetriever):
"""A document retriever that supports indexing operations.
This indexing interface is designed to be a generic abstraction for storing and
querying documents that has an ID and metadata associated with it.
The interface is designed to support the following operations:
1. Storing document in the index.
2. Fetching document by ID.
3. Searching for document using a query.
"""
DocumentIndex 继承 BaseRetriever,在检索之上增加了写入能力:
BaseRetriever(只读)
│
↓ 继承
DocumentIndex(读写)
├── upsert(items) → UpsertResponse # 写入
├── delete(ids) → DeleteResponse # 删除
├── get(ids) → list[Document] # 按 ID 查
└── _get_relevant_documents(query) # 搜索(继承自 BaseRetriever)
与 VectorStore 的区别
| 特性 | VectorStore | DocumentIndex |
|---|---|---|
| 继承 | ABC | BaseRetriever |
| 是 Runnable | 否 | 是 |
| 写入方法 | add_documents → list[str] | upsert → UpsertResponse |
| 删除返回 | bool | None | DeleteResponse(结构化) |
| 按 ID 查 | get_by_ids | get |
| 向量搜索 | similarity_search | _get_relevant_documents |
| 嵌入支持 | 内置(embeddings 属性) | 由实现决定 |
| 成熟度 | 稳定 | Beta |
DocumentIndex 是一个更通用的抽象——不限于向量搜索,可以是全文搜索、关键词搜索等。
InMemoryDocumentIndex
打开 indexing/in_memory.py:19:
# indexing/in_memory.py:19-104
@beta(message="Introduced in version 0.2.29.")
class InMemoryDocumentIndex(DocumentIndex):
store: dict[str, Document] = Field(default_factory=dict)
top_k: int = 4
def upsert(self, items, /, **kwargs):
ok_ids = []
for item in items:
if item.id is None:
id_ = str(uuid.uuid4())
item_ = item.model_copy()
item_.id = id_
else:
item_ = item
id_ = item.id
self.store[id_] = item_
ok_ids.append(id_)
return UpsertResponse(succeeded=ok_ids, failed=[])
def _get_relevant_documents(self, query, *, run_manager):
# 简单的子串计数搜索(非向量搜索)
counts_by_doc = []
for document in self.store.values():
count = document.page_content.count(query)
counts_by_doc.append((document, count))
counts_by_doc.sort(key=operator.itemgetter(1), reverse=True)
return [doc.model_copy() for doc, count in counts_by_doc[:self.top_k]]
注意搜索实现用的是子串计数而非向量相似度——这是一个教学/测试用的实现。
index() 与 VectorStore / DocumentIndex 的双轨写入
index() 函数同时支持两种目标:
# indexing/api.py:527-539
if docs_to_index:
if isinstance(destination, VectorStore):
destination.add_documents(docs_to_index, ids=uids, ...)
elif isinstance(destination, DocumentIndex):
destination.upsert(docs_to_index, ...)
index()
│
├── destination is VectorStore?
│ → add_documents(docs, ids=uids)
│ → delete(ids) for cleanup
│
└── destination is DocumentIndex?
→ upsert(docs)
→ delete(ids) for cleanup → 检查 DeleteResponse.num_failed
删除操作也有差异:
# indexing/api.py:241-271
def _delete(vector_store, ids):
if isinstance(vector_store, VectorStore):
delete_ok = vector_store.delete(ids)
if delete_ok is not None and delete_ok is False:
raise IndexingException(...)
elif isinstance(vector_store, DocumentIndex):
delete_response = vector_store.delete(ids)
if "num_failed" in delete_response and delete_response["num_failed"] > 0:
raise IndexingException(...)
aindex()——异步版
aindex() 是 index() 的异步镜像(indexing/api.py:629-948),逻辑完全相同,区别在于:
| index() | aindex() |
|---|---|
record_manager.get_time() |
await record_manager.aget_time() |
record_manager.exists() |
await record_manager.aexists() |
destination.add_documents() |
await destination.aadd_documents() |
for doc_batch in _batch(...) |
async for doc_batch in _abatch(...) |
docs_source.lazy_load() |
docs_source.alazy_load() |
aindex() 额外支持 AsyncIterator[Document] 作为输入源。
完整架构图
Indexing 模块全景
════════════════════════════════════════════════════════
┌─────────────────────┐
│ BaseLoader / │
│ Iterable[Document]│
└──────────┬──────────┘
│ lazy_load() / iter()
↓
┌──────────────────────┐
│ index() / │
│ aindex() │ ← indexing/api.py
│ │
│ ┌────────────────┐ │
│ │ _batch() │ │ 分批迭代
│ └───────┬────────┘ │
│ ↓ │
│ ┌────────────────┐ │
│ │ _get_document │ │ 计算内容哈希
│ │ _with_hash() │ │ (sha1/sha256/blake2b)
│ └───────┬────────┘ │
│ ↓ │
│ ┌────────────────┐ │
│ │ _deduplicate │ │ 批内去重
│ │ _in_order() │ │
│ └───────┬────────┘ │
│ ↓ │
│ ┌────────────────┐ │
│ │ record_manager │ │ 检查已存在?
│ │ .exists() │──│──→ RecordManager
│ └───────┬────────┘ │ (key, group_id,
│ ↓ │ updated_at)
│ 新文档 / 已存在 │
│ │ │ │
│ ↓ ↓ │
│ 写入VS 刷新时间戳 │
│ │ │
│ ↓ │
│ cleanup? │
│ ├─ incremental →每批│
│ ├─ full →最后 │
│ └─ scoped_full→最后 │
└──────────┬───────────┘
│
┌──────────▼───────────┐
│ VectorStore / │
│ DocumentIndex │
│ (最终存储) │
└──────────────────────┘
九个系列总览
| 系列 | 核心类 | 源码位置 | 行数 |
|---|---|---|---|
| Messages | BaseMessage / AIMessage / HumanMessage / ToolMessage | messages/ |
~1500 |
| Runnables | Runnable / RunnableSequence / RunnableLambda | runnables/ |
~5000 |
| ChatModel | BaseChatModel / generate / stream | language_models/ |
~1400 |
| Outputs | Generation / ChatResult / AIMessageChunk | outputs/ |
~600 |
| Prompts | BasePromptTemplate / ChatPromptTemplate | prompts/ |
~2200 |
| Tools | BaseTool / StructuredTool / tool 装饰器 | tools/ |
~1600 |
| Tracers | BaseTracer / LangSmithTracer / ConsoleCallbackHandler | tracers/ + callbacks/ |
~2000 |
| Embeddings | Embeddings / VectorStore / InMemoryVectorStore | embeddings/ + vectorstores/ |
~1900 |
| Indexing | RecordManager / index() / DocumentIndex | indexing/ |
~1770 |
小结
| 要点 | 内容 |
|---|---|
| 核心函数 | index() / aindex()(indexing/api.py,949 行) |
| 去重机制 | 内容哈希 + RecordManager.exists() |
| 写入顺序 | 先 VectorStore,后 RecordManager(悲观策略) |
| cleanup 模式 | None / incremental / full / scoped_full |
| incremental | 每批清理同源旧文档,需要 source_id_key |
| full | 全部完成后清理所有旧文档,需要完整数据集 |
| scoped_full | 全部完成后只清理见过的源的旧文档 |
| DocumentIndex | Beta 抽象,BaseRetriever + upsert/delete/get |
| 双轨写入 | 同时支持 VectorStore 和 DocumentIndex |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)