【LangChain 源码解析八:Embeddings】
本系列共 2 部分,从抽象接口到向量存储,完整拆解 LangChain Embeddings 的设计与实现。
- 第 1 部分:Embeddings 接口——向量化的抽象(本文)
- 第 2 部分:VectorStore——Embeddings 的消费者与 RAG 完整链路
LangChain Embeddings 深度解析(一):Embeddings 接口——向量化的抽象
一个能跑的例子
from langchain_core.embeddings import Embeddings, DeterministicFakeEmbedding
# ---- 用确定性假模型演示接口 ----
embed = DeterministicFakeEmbedding(size=5)
# 单条查询
vec = embed.embed_query("LangChain 是什么?")
print(f"维度: {len(vec)}") # 维度: 5
print(f"向量: {[round(v, 4) for v in vec]}") # 每次运行相同
# 批量文档
docs = ["苹果是水果", "Python 是语言", "LangChain 是框架"]
vecs = embed.embed_documents(docs)
print(f"文档数: {len(vecs)}") # 文档数: 3
print(f"每条维度: {len(vecs[0])}") # 每条维度: 5
# 相同文本 → 相同向量(确定性)
vec_again = embed.embed_query("LangChain 是什么?")
print(f"确定性: {vec == vec_again}") # 确定性: True
Embeddings 是 LangChain 中最小的核心抽象之一——整个模块只有 3 个文件、约 240 行代码。但它是 RAG 管线的关键枢纽:把自然语言文本映射到向量空间,让"语义相似度"变成可计算的数学距离。
Embeddings ABC——四个方法,两个抽象
打开 embeddings/embeddings.py:8:
# embeddings/embeddings.py:8-78
class Embeddings(ABC):
"""Interface for embedding models.
Text embedding models are used to map text to a vector
(a point in n-dimensional space).
"""
@abstractmethod
def embed_documents(self, texts: list[str]) -> list[list[float]]:
"""Embed search docs."""
@abstractmethod
def embed_query(self, text: str) -> list[float]:
"""Embed query text."""
async def aembed_documents(self, texts: list[str]) -> list[list[float]]:
return await run_in_executor(None, self.embed_documents, texts)
async def aembed_query(self, text: str) -> list[float]:
return await run_in_executor(None, self.embed_query, text)
整个接口只有 4 个方法:
| 方法 | 签名 | 是否抽象 | 说明 |
|---|---|---|---|
embed_documents |
(texts: list[str]) → list[list[float]] |
抽象 | 批量嵌入文档 |
embed_query |
(text: str) → list[float] |
抽象 | 单条查询嵌入 |
aembed_documents |
(texts: list[str]) → list[list[float]] |
非抽象 | 异步版,默认用 run_in_executor |
aembed_query |
(text: str) → list[float] |
非抽象 | 异步版,默认用 run_in_executor |
为什么分 embed_documents 和 embed_query?
表面上看两者都是"文本 → 向量",为什么要分开?
原因 1:某些模型对查询和文档使用不同的编码策略
─────────────────────────────────────────────
例如 Google 的 text-embedding-004 在 embed 时需要指定 task_type:
- RETRIEVAL_DOCUMENT → 用于被检索的文档
- RETRIEVAL_QUERY → 用于查询
原因 2:批量 vs 单条的性能优化
─────────────────────────────
embed_documents → 接收 list[str],可以做 batching/GPU 并行
embed_query → 接收单个 str,查询通常只有一条
原因 3:绝大多数实现中两者结果相同
─────────────────────────────────
OpenAIEmbeddings.embed_query 内部就是调用
embed_documents([text])[0]
异步默认实现——run_in_executor 模式
# embeddings/embeddings.py:58-67
async def aembed_documents(self, texts: list[str]) -> list[list[float]]:
return await run_in_executor(None, self.embed_documents, texts)
这是 LangChain 中非常常见的模式(Messages、Runnables、Loaders 系列都见过):
同步方法已实现? ─── Yes ──→ 异步版自动获得
│ (run_in_executor 包装)
No
↓
子类需要同时实现 sync + async
run_in_executor(None, ...) 表示使用默认线程池执行器,把同步阻塞调用放到线程池中运行。合约集成(如 langchain-openai)可以覆盖异步方法提供原生 async 实现。
Embeddings 不继承 Pydantic 也不继承 Runnable
注意一个重要细节——Embeddings 只继承 ABC:
# 对比其他核心抽象
class BaseChatModel(BaseLanguageModel): # → Runnable 子类
class BaseRetriever(RunnableSerializable): ...
class BaseLoader: ... # 无基类(除 object)
class Embeddings(ABC): ... # 纯 ABC,最轻量
这意味着:
- 不是 Runnable——不能直接用
|管道组合,也没有.invoke() - 不是 Pydantic Model——没有序列化、验证、schema 生成
- 最大灵活性——具体实现可以自由选择继承
BaseModel或不继承
实际中,大多数合约实现(如 OpenAIEmbeddings)都会同时继承 Embeddings 和 BaseModel:
# 典型的 partner 实现模式
class OpenAIEmbeddings(BaseModel, Embeddings):
model: str = "text-embedding-ada-002"
...
FakeEmbeddings——测试用的随机向量
打开 embeddings/fake.py:16:
# embeddings/fake.py:16-67
class FakeEmbeddings(Embeddings, BaseModel):
"""Fake embedding model for unit testing purposes."""
size: int
"""The size of the embedding vector."""
def _get_embedding(self) -> list[float]:
return list(np.random.default_rng().normal(size=self.size))
def embed_documents(self, texts: list[str]) -> list[list[float]]:
return [self._get_embedding() for _ in texts]
def embed_query(self, text: str) -> list[float]:
return self._get_embedding()
关键特征:
- 随机生成:每次调用
np.random.default_rng().normal()产生不同向量 - 忽略文本内容:
texts参数完全不使用,只决定返回多少个向量 - 用途:单元测试中占位,验证管道连通性
DeterministicFakeEmbedding——确定性的假模型
打开 embeddings/fake.py:70:
# embeddings/fake.py:70-129
class DeterministicFakeEmbedding(Embeddings, BaseModel):
"""Deterministic fake embedding model for unit testing purposes."""
size: int
def _get_embedding(self, seed: int) -> list[float]:
# set the seed for the random generator
rng = np.random.default_rng(seed) # ← 固定种子
return list(rng.normal(size=self.size))
@staticmethod
def _get_seed(text: str) -> int:
"""Get a seed for the random generator, using the hash of the text."""
return int(hashlib.sha256(text.encode("utf-8")).hexdigest(), 16) % 10**8
def embed_documents(self, texts: list[str]) -> list[list[float]]:
return [self._get_embedding(seed=self._get_seed(_)) for _ in texts]
def embed_query(self, text: str) -> list[float]:
return self._get_embedding(seed=self._get_seed(text))
核心区别在于种子生成策略:
文本 "hello"
│
↓ SHA-256
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
│
↓ int(..., 16) % 10**8
42596730 ← 确定性种子
│
↓ np.random.default_rng(42596730).normal(size=5)
[-0.3021, 1.4528, ...] ← 每次运行结果相同
两种假模型对比:
| 特性 | FakeEmbeddings | DeterministicFakeEmbedding |
|---|---|---|
| 文本敏感 | 否(忽略输入) | 是(SHA-256 → 种子) |
| 确定性 | 否(每次随机) | 是(相同文本→相同向量) |
| 适用场景 | 管道连通性测试 | 需要可复现结果的测试 |
| 相似度检索 | 无意义 | 有限意义(同文本→同向量) |
模块导出——延迟导入模式
打开 embeddings/__init__.py:
# embeddings/__init__.py:1-31
from typing import TYPE_CHECKING
from langchain_core._import_utils import import_attr
if TYPE_CHECKING:
from langchain_core.embeddings.embeddings import Embeddings
from langchain_core.embeddings.fake import (
DeterministicFakeEmbedding,
FakeEmbeddings,
)
__all__ = ("DeterministicFakeEmbedding", "Embeddings", "FakeEmbeddings")
_dynamic_imports = {
"Embeddings": "embeddings",
"DeterministicFakeEmbedding": "fake",
"FakeEmbeddings": "fake",
}
def __getattr__(attr_name: str) -> object:
module_name = _dynamic_imports.get(attr_name)
result = import_attr(attr_name, module_name, __spec__.parent)
globals()[attr_name] = result
return result
这是 LangChain Core 的标准模式——利用 __getattr__ 实现延迟导入:
import langchain_core.embeddings
│
↓ 此时只执行 __init__.py 的模块级代码
↓ Embeddings / FakeEmbeddings 类还没加载
langchain_core.embeddings.Embeddings
│
↓ 触发 __getattr__("Embeddings")
↓ import_attr 从 "embeddings" 子模块动态导入
↓ 缓存到 globals() 避免重复导入
自己实现一个 Embeddings
理解了接口后,实现一个简单的 Embeddings 非常直接:
from langchain_core.embeddings import Embeddings
class BagOfWordsEmbeddings(Embeddings):
"""极简的词袋模型嵌入——仅用于教学。"""
def __init__(self, vocab: list[str]):
self.vocab = vocab
def embed_documents(self, texts: list[str]) -> list[list[float]]:
return [self._encode(t) for t in texts]
def embed_query(self, text: str) -> list[float]:
return self._encode(text)
def _encode(self, text: str) -> list[float]:
words = text.lower().split()
return [float(words.count(w)) for w in self.vocab]
# 测试
bow = BagOfWordsEmbeddings(vocab=["python", "java", "is", "great"])
v1 = bow.embed_query("Python is great")
v2 = bow.embed_query("Java is great")
print(v1) # [1.0, 0.0, 1.0, 1.0]
print(v2) # [0.0, 1.0, 1.0, 1.0]
只需实现 embed_documents 和 embed_query 两个方法。异步版本自动获得。
Embeddings 在 LangChain 生态中的位置
┌──────────────────────────────────┐
│ Partner Integrations │
│ OpenAIEmbeddings │
│ HuggingFaceEmbeddings │
│ GoogleGenerativeAIEmbeddings │
│ OllamaEmbeddings │
│ ... │
└──────────┬───────────────────────┘
│ 实现
↓
┌──────────────────────┐
│ Embeddings (ABC) │ ← embeddings/embeddings.py
│ │
│ embed_documents() │ 文本 → 向量(批量)
│ embed_query() │ 文本 → 向量(单条)
│ aembed_documents() │ 异步批量
│ aembed_query() │ 异步单条
└──────────┬───────────┘
│ 被注入
┌──────────▼───────────┐
│ VectorStore (ABC) │ ← vectorstores/base.py
│ │
│ from_texts(embed) │ 构造时注入 Embeddings
│ add_texts() │ 内部调 embed_documents
│ similarity_search() │ 内部调 embed_query
│ as_retriever() │ → VectorStoreRetriever
└──────────┬───────────┘
│ 适配
┌──────────▼───────────┐
│ VectorStoreRetriever │ ← BaseRetriever 子类
│ │ → 可进入 LCEL 管道
│ invoke(query) │ query → Documents
└──────────────────────┘
小结
| 要点 | 内容 |
|---|---|
| 文件 | embeddings/embeddings.py(79 行) |
| 核心类 | Embeddings(ABC) |
| 抽象方法 | embed_documents + embed_query |
| 异步 | 默认 run_in_executor,可覆盖 |
| 继承 | 纯 ABC,不是 Runnable、不是 Pydantic |
| 测试替身 | FakeEmbeddings(随机)+ DeterministicFakeEmbedding(确定性) |
下一篇问题:Embeddings 把文本变成了向量,但向量需要存储、索引、检索。VectorStore 是如何消费 Embeddings 的?InMemoryVectorStore 用什么算法计算相似度?as_retriever() 又是如何把 VectorStore 接入 LCEL 管道的?
LangChain Embeddings 深度解析(二):VectorStore——Embeddings 的消费者与 RAG 完整链路
一个能跑的例子
from langchain_core.documents import Document
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.vectorstores import InMemoryVectorStore
# ---- 构建向量存储 ----
embed = DeterministicFakeEmbedding(size=50)
store = InMemoryVectorStore(embedding=embed)
# 添加文档
docs = [
Document(page_content="Python 是一门解释型编程语言", metadata={"topic": "python"}),
Document(page_content="Java 是一门编译型编程语言", metadata={"topic": "java"}),
Document(page_content="苹果是一种常见的水果", metadata={"topic": "fruit"}),
Document(page_content="香蕉富含钾元素", metadata={"topic": "fruit"}),
]
ids = store.add_documents(docs)
print(f"添加了 {len(ids)} 条文档") # 添加了 4 条文档
# ---- 相似度搜索 ----
results = store.similarity_search("编程语言", k=2)
for doc in results:
print(f" [{doc.metadata['topic']}] {doc.page_content}")
# ---- 转为 Retriever(接入 LCEL 管道) ----
retriever = store.as_retriever(search_kwargs={"k": 2})
docs = retriever.invoke("水果")
for doc in docs:
print(f" [{doc.metadata['topic']}] {doc.page_content}")
VectorStore 是 Embeddings 的直接消费者——它在写入时调用 embed_documents() 将文本转为向量,在查询时调用 embed_query() 将查询转为向量,然后通过相似度计算返回最相关的文档。
VectorStore ABC——接口全景
打开 vectorstores/base.py:43:
# vectorstores/base.py:43-44
class VectorStore(ABC):
"""Interface for vector store."""
VectorStore 有 1111 行代码,但核心结构清晰。按功能分组:
写入方法
# vectorstores/base.py:46-98 ← add_texts
def add_texts(self, texts, metadatas=None, *, ids=None, **kwargs) -> list[str]:
"""Run more texts through the embeddings and add to the VectorStore."""
# 如果子类实现了 add_documents,则转发
if type(self).add_documents != VectorStore.add_documents:
docs = [Document(id=id_, page_content=text, metadata=metadata_)
for text, metadata_, id_ in zip(texts, metadatas_, ids_)]
return self.add_documents(docs, **kwargs)
raise NotImplementedError(...)
# vectorstores/base.py:234-263 ← add_documents
def add_documents(self, documents, **kwargs) -> list[str]:
"""Add or update documents in the VectorStore."""
# 如果子类实现了 add_texts,则转发
if type(self).add_texts != VectorStore.add_texts:
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return self.add_texts(texts, metadatas, **kwargs)
raise NotImplementedError(...)
这是一个双向委托设计——子类只需实现 add_texts 或 add_documents 其中之一:
子类实现了 add_texts?
│
Yes → add_documents 自动转发到 add_texts
│
No → 子类实现了 add_documents?
│
Yes → add_texts 自动转发到 add_documents
│
No → 两者都抛 NotImplementedError
Embeddings 属性
# vectorstores/base.py:99-106
@property
def embeddings(self) -> Embeddings | None:
"""Access the query embedding object if available."""
logger.debug(
"The embeddings property has not been implemented for %s",
self.__class__.__name__,
)
return None
基类默认返回 None,子类覆盖返回实际的 Embeddings 实例。这个属性被 VectorStoreRetriever 用于 LangSmith 追踪。
搜索方法
VectorStore 定义了三种搜索策略:
# vectorstores/base.py:293-324 ← search 统一入口
def search(self, query: str, search_type: str, **kwargs) -> list[Document]:
if search_type == "similarity":
return self.similarity_search(query, **kwargs)
if search_type == "similarity_score_threshold":
docs_and_similarities = self.similarity_search_with_relevance_scores(...)
return [doc for doc, _ in docs_and_similarities]
if search_type == "mmr":
return self.max_marginal_relevance_search(query, **kwargs)
| 搜索类型 | 方法 | 说明 |
|---|---|---|
similarity |
similarity_search() |
余弦相似度 top-k,唯一的抽象方法 |
similarity_score_threshold |
similarity_search_with_relevance_scores() |
带分数阈值过滤 |
mmr |
max_marginal_relevance_search() |
最大边际相关——兼顾相似度与多样性 |
# vectorstores/base.py:360-373 ← 唯一的抽象方法
@abstractmethod
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> list[Document]:
"""Return docs most similar to query."""
工厂方法
# vectorstores/base.py:846-868 ← from_texts(抽象)
@classmethod
@abstractmethod
def from_texts(cls, texts, embedding, metadatas=None, *, ids=None, **kwargs) -> VST:
"""Return VectorStore initialized from texts and embeddings."""
# vectorstores/base.py:786-814 ← from_documents(基于 from_texts)
@classmethod
def from_documents(cls, documents, embedding, **kwargs) -> Self:
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
from_texts 是第二个抽象方法。from_documents 是便捷包装。两者都接收 embedding: Embeddings 参数——这是 Embeddings 被注入 VectorStore 的主要入口。
相关度分数转换
VectorStore 提供三种距离到相似度的转换函数:
# vectorstores/base.py:375-401
@staticmethod
def _euclidean_relevance_score_fn(distance: float) -> float:
return 1.0 - distance / math.sqrt(2) # [0, sqrt(2)] → [1, 0]
@staticmethod
def _cosine_relevance_score_fn(distance: float) -> float:
return 1.0 - distance # cosine distance → similarity
@staticmethod
def _max_inner_product_relevance_score_fn(distance: float) -> float:
if distance > 0:
return 1.0 - distance
return -1.0 * distance
子类通过覆盖 _select_relevance_score_fn() 选择适合自己后端的转换函数。
as_retriever——桥接到 LCEL
# vectorstores/base.py:905-961
def as_retriever(self, **kwargs) -> VectorStoreRetriever:
tags = kwargs.pop("tags", None) or [*self._get_retriever_tags()]
return VectorStoreRetriever(vectorstore=self, tags=tags, **kwargs)
一行代码,把 VectorStore 包装成 BaseRetriever 子类,接入 LCEL 管道。
VectorStoreRetriever——从 VectorStore 到 Runnable
打开 vectorstores/base.py:964:
# vectorstores/base.py:964-985
class VectorStoreRetriever(BaseRetriever):
"""Base Retriever class for VectorStore."""
vectorstore: VectorStore
search_type: str = "similarity"
search_kwargs: dict = Field(default_factory=dict)
allowed_search_types: ClassVar[Collection[str]] = (
"similarity",
"similarity_score_threshold",
"mmr",
)
输入验证
# vectorstores/base.py:986-1016
@model_validator(mode="before")
@classmethod
def validate_search_type(cls, values: dict) -> Any:
search_type = values.get("search_type", "similarity")
if search_type not in cls.allowed_search_types:
raise ValueError(...)
if search_type == "similarity_score_threshold":
score_threshold = values.get("search_kwargs", {}).get("score_threshold")
if (score_threshold is None) or (not isinstance(score_threshold, float)):
raise ValueError("`score_threshold` is not specified...")
return values
如果用 similarity_score_threshold 搜索类型,必须提供 score_threshold 参数。
核心检索逻辑
# vectorstores/base.py:1039-1058
def _get_relevant_documents(self, query, *, run_manager, **kwargs):
kwargs_ = self.search_kwargs | kwargs
if self.search_type == "similarity":
docs = self.vectorstore.similarity_search(query, **kwargs_)
elif self.search_type == "similarity_score_threshold":
docs_and_similarities = (
self.vectorstore.similarity_search_with_relevance_scores(query, **kwargs_)
)
docs = [doc for doc, _ in docs_and_similarities]
elif self.search_type == "mmr":
docs = self.vectorstore.max_marginal_relevance_search(query, **kwargs_)
return docs
_get_relevant_documents 是 BaseRetriever 的抽象方法——实现后,retriever.invoke(query) 就能工作了。
LangSmith 追踪集成
# vectorstores/base.py:1018-1037
def _get_ls_params(self, **kwargs) -> LangSmithRetrieverParams:
ls_params = super()._get_ls_params(**kwargs_)
ls_params["ls_vector_store_provider"] = self.vectorstore.__class__.__name__
if self.vectorstore.embeddings:
ls_params["ls_embedding_provider"] = (
self.vectorstore.embeddings.__class__.__name__
)
return ls_params
追踪记录了 VectorStore 和 Embeddings 的类名,方便在 LangSmith 中观测。
InMemoryVectorStore——从零到一的完整实现
打开 vectorstores/in_memory.py:34。这是 LangChain Core 内置的唯一具体 VectorStore 实现,用纯 Python + NumPy 实现,适合教学和小规模场景。
构造器——注入 Embeddings
# vectorstores/in_memory.py:161-170
def __init__(self, embedding: Embeddings) -> None:
self.store: dict[str, dict[str, Any]] = {} # id → {id, vector, text, metadata}
self.embedding = embedding
@property
def embeddings(self) -> Embeddings:
return self.embedding
存储就是一个字典。每条记录包含 4 个字段:id、vector、text、metadata。
写入——embed_documents 的调用点
# vectorstores/in_memory.py:188-221
def add_documents(self, documents, ids=None, **kwargs) -> list[str]:
texts = [doc.page_content for doc in documents]
vectors = self.embedding.embed_documents(texts) # ← 批量嵌入
id_iterator = iter(ids) if ids else iter(doc.id for doc in documents)
ids_ = []
for doc, vector in zip(documents, vectors):
doc_id = next(id_iterator)
doc_id_ = doc_id or str(uuid.uuid4())
ids_.append(doc_id_)
self.store[doc_id_] = {
"id": doc_id_,
"vector": vector, # ← 向量存入字典
"text": doc.page_content,
"metadata": doc.metadata,
}
return ids_
写入流程:
documents: [Doc1, Doc2, Doc3]
│
↓ .page_content
texts: ["text1", "text2", "text3"]
│
↓ embedding.embed_documents(texts)
vectors: [[0.1, 0.3, ...], [0.2, 0.5, ...], [0.7, 0.1, ...]]
│
↓ zip(documents, vectors) → 逐条存入 self.store
store: {
"uuid-1": {"id": "uuid-1", "vector": [0.1, 0.3, ...], "text": "text1", "metadata": {...}},
"uuid-2": {"id": "uuid-2", "vector": [0.2, 0.5, ...], "text": "text2", "metadata": {...}},
"uuid-3": {"id": "uuid-3", "vector": [0.7, 0.1, ...], "text": "text3", "metadata": {...}},
}
搜索——embed_query 的调用点
# vectorstores/in_memory.py:358-370
def similarity_search_with_score(self, query, k=4, **kwargs):
embedding = self.embedding.embed_query(query) # ← 查询嵌入
return self.similarity_search_with_score_by_vector(embedding, k, **kwargs)
查询流程:
query: "编程语言"
│
↓ embedding.embed_query(query)
query_vec: [0.15, 0.4, ...]
│
↓ cosine_similarity([query_vec], [所有文档向量])
similarities: [0.92, 0.88, 0.31, 0.25]
│
↓ argsort()[::-1][:k] ← top-k 索引
top_k_idx: [0, 1]
│
↓ 构造 Document + score 返回
[(Doc1, 0.92), (Doc2, 0.88)]
核心搜索实现
# vectorstores/in_memory.py:291-332
def _similarity_search_with_score_by_vector(self, embedding, k=4, filter=None):
docs = list(self.store.values())
if filter is not None:
docs = [doc for doc in docs
if filter(Document(id=doc["id"], page_content=doc["text"],
metadata=doc["metadata"]))]
if not docs:
return []
similarity = cosine_similarity( # ← vectorstores/utils.py
[embedding], [doc["vector"] for doc in docs]
)[0]
top_k_idx = similarity.argsort()[::-1][:k] # ← 降序取前 k
return [
(Document(id=doc_dict["id"], page_content=doc_dict["text"],
metadata=doc_dict["metadata"]),
float(similarity[idx].item()),
doc_dict["vector"])
for idx in top_k_idx
if (doc_dict := docs[idx])
]
注意 filter 参数接受一个 Callable[[Document], bool]——这是内存实现的灵活之处,可以用 Python 函数做任意过滤。
持久化——dump / load
# vectorstores/in_memory.py:537-546
def dump(self, path: str) -> None:
path_ = Path(path)
path_.parent.mkdir(exist_ok=True, parents=True)
with path_.open("w", encoding="utf-8") as f:
json.dump(dumpd(self.store), f, indent=2)
@classmethod
def load(cls, path, embedding, **kwargs) -> InMemoryVectorStore:
with Path(path).open("r", encoding="utf-8") as f:
store = load(json.load(f), allowed_objects=[Document])
vectorstore = cls(embedding=embedding, **kwargs)
vectorstore.store = store
return vectorstore
使用 LangChain 的 dumpd / load 序列化工具,将 store 字典存为 JSON。注意加载时仍需传入 embedding——向量已存在文件中,但 Embeddings 实例不会被序列化。
余弦相似度与 MMR 算法
打开 vectorstores/utils.py。这是 VectorStore 的数学引擎。
余弦相似度
# vectorstores/utils.py:35-103
def _cosine_similarity(x: Matrix, y: Matrix) -> np.ndarray:
"""Row-wise cosine similarity between two equal-width matrices."""
x = np.array(x)
y = np.array(y)
# NaN / Inf 检查(仅警告,不阻断)
if np.any(np.isnan(x)) or np.any(np.isnan(y)):
warnings.warn("NaN found in input arrays...")
if np.any(np.isinf(x)) or np.any(np.isinf(y)):
warnings.warn("Inf found in input arrays...")
if x.shape[1] != y.shape[1]:
raise ValueError(f"Number of columns in X and Y must be the same...")
if not _HAS_SIMSIMD:
# NumPy 实现
x_norm = np.linalg.norm(x, axis=1)
y_norm = np.linalg.norm(y, axis=1)
similarity = np.dot(x, y.T) / np.outer(x_norm, y_norm)
similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0
return similarity
else:
# SimSIMD 加速实现
return 1 - np.array(simd.cdist(x, y, metric="cosine"))
计算公式:
cosine_similarity(A, B) = (A · B) / (||A|| × ||B||)
其中:
A · B = Σ(ai × bi) ← 内积
||A|| = √Σ(ai²) ← L2 范数
两层实现策略:
- SimSIMD 可用 → 使用 SIMD 指令加速,
1 - cdist(cosine) - 仅 NumPy → 手动计算
dot(x, y.T) / outer(x_norm, y_norm)
最大边际相关性(MMR)
# vectorstores/utils.py:106-157
def maximal_marginal_relevance(
query_embedding: np.ndarray,
embedding_list: list,
lambda_mult: float = 0.5,
k: int = 4,
) -> list[int]:
if min(k, len(embedding_list)) <= 0:
return []
similarity_to_query = _cosine_similarity(query_embedding, embedding_list)[0]
most_similar = int(np.argmax(similarity_to_query))
idxs = [most_similar]
selected = np.array([embedding_list[most_similar]])
while len(idxs) < min(k, len(embedding_list)):
best_score = -np.inf
idx_to_add = -1
similarity_to_selected = _cosine_similarity(embedding_list, selected)
for i, query_score in enumerate(similarity_to_query):
if i in idxs:
continue
redundant_score = max(similarity_to_selected[i])
equation_score = (
lambda_mult * query_score - (1 - lambda_mult) * redundant_score
)
if equation_score > best_score:
best_score = equation_score
idx_to_add = i
idxs.append(idx_to_add)
selected = np.append(selected, [embedding_list[idx_to_add]], axis=0)
return idxs
MMR 的核心思想——贪心选择,平衡相关性和多样性:
MMR(Di) = λ × Sim(Di, Q) - (1-λ) × max[Sim(Di, Dj)]
j∈已选集
λ = 1.0 → 纯相似度排序(最相关)
λ = 0.0 → 纯多样性排序(最不像已选的)
λ = 0.5 → 平衡(默认)
算法步骤:
1. 计算所有文档与查询的相似度
2. 选出最相似的文档放入结果集
3. 循环 k-1 次:
a. 对每个未选文档计算 MMR 分数
b. MMR = λ × query_sim - (1-λ) × max(与已选文档的相似度)
c. 选 MMR 最高的文档加入结果集
4. 返回选中的文档索引
InMemoryVectorStore 中的 MMR 调用
# vectorstores/in_memory.py:418-448
def max_marginal_relevance_search_by_vector(
self, embedding, k=4, fetch_k=20, lambda_mult=0.5, *, filter=None, **kwargs
):
prefetch_hits = self._similarity_search_with_score_by_vector(
embedding=embedding, k=fetch_k, filter=filter # 先取 fetch_k 个候选
)
mmr_chosen_indices = maximal_marginal_relevance(
np.array(embedding, dtype=np.float32),
[vector for _, _, vector in prefetch_hits], # 候选向量
k=k, lambda_mult=lambda_mult,
)
return [prefetch_hits[idx][0] for idx in mmr_chosen_indices]
两阶段策略:先用余弦相似度取 fetch_k(默认 20)个候选,再用 MMR 从中选 k(默认 4)个。
RAG 完整链路——从文本到答案
把所有组件串起来:
RAG 数据流全景图
═══════════════════════════════════════════════════════════
原始文本 嵌入 向量存储
┌─────────┐ ┌──────────────┐ ┌──────────────┐
│ "Python │ │ │ │ store: │
│ 是一门 │ ──→ │ Embeddings │ ──→ │ {id: vec, │
│ 编程语言│ │ .embed_docs()│ │ text, meta} │
│ " │ │ │ │ │
└─────────┘ └──────────────┘ └──────┬───────┘
│
.as_retriever()
│
用户查询 嵌入 ┌──▼───────────┐
┌─────────┐ ┌──────────────┐ │ VectorStore- │
│"编程语言 │ │ │ │ Retriever │
│ 有哪些" │ ──→ │ Embeddings │ ──→ │ │
│ │ │ .embed_query │ │ _get_relevant │
└─────────┘ │ () │ │ _documents() │
└──────────────┘ └──────┬───────┘
│
cosine_similarity
/ MMR 选择 top-k
│
┌──────▼───────┐
│ [Document, │
│ Document, │
│ ...] │
└──────┬───────┘
│
可接入 LLM 生成回答
retriever | prompt | llm
LCEL 管道示例(概念)
# 概念代码(需要真实 LLM 才能运行)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
prompt = ChatPromptTemplate.from_template(
"根据以下上下文回答问题:\n{context}\n\n问题:{question}"
)
# retriever 是 Runnable,可以用 | 组合
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
)
# chain.invoke("Python 是什么?")
这就是为什么 VectorStoreRetriever 继承 BaseRetriever(继承 RunnableSerializable)——它让向量检索可以无缝融入 LCEL 管道。
VectorStore 方法对比表
| 方法 | 类型 | 说明 |
|---|---|---|
similarity_search(query, k) |
抽象 | 余弦相似度 top-k |
similarity_search_by_vector(embedding, k) |
可选 | 直接用向量搜索 |
similarity_search_with_score(query, k) |
可选 | 返回 (doc, score) |
similarity_search_with_relevance_scores(query, k) |
内置 | 分数归一到 [0,1] |
max_marginal_relevance_search(query, k, fetch_k, λ) |
可选 | MMR 搜索 |
from_texts(texts, embedding) |
抽象类方法 | 构造 + 批量添加 |
from_documents(docs, embedding) |
类方法 | 基于 from_texts |
add_texts(texts, metadatas) |
可选 | 增量添加文本 |
add_documents(docs) |
可选 | 增量添加文档 |
delete(ids) |
可选 | 按 ID 删除 |
get_by_ids(ids) |
可选 | 按 ID 获取 |
as_retriever(**kwargs) |
内置 | 转为 Retriever |
“抽象” = 子类必须实现;“可选” = 基类有默认(通常是
raise NotImplementedError);“内置” = 基类有完整实现。
八个系列总览
| 系列 | 核心类 | 源码位置 | 行数 |
|---|---|---|---|
| Messages | BaseMessage / AIMessage / HumanMessage / ToolMessage | messages/ |
~1500 |
| Runnables | Runnable / RunnableSequence / RunnableLambda | runnables/ |
~5000 |
| ChatModel | BaseChatModel / generate / stream | language_models/ |
~1400 |
| Outputs | Generation / ChatResult / AIMessageChunk | outputs/ |
~600 |
| Prompts | BasePromptTemplate / ChatPromptTemplate | prompts/ |
~2200 |
| Tools | BaseTool / StructuredTool / tool 装饰器 | tools/ |
~1600 |
| Tracers | BaseTracer / LangSmithTracer / ConsoleCallbackHandler | tracers/ + callbacks/ |
~2000 |
| Embeddings | Embeddings / VectorStore / InMemoryVectorStore | embeddings/ + vectorstores/ |
~1900 |
小结
| 要点 | 内容 |
|---|---|
| VectorStore 抽象方法 | similarity_search() + from_texts() |
| 写入流程 | add_documents() → embed_documents() → 存储向量 |
| 查询流程 | similarity_search() → embed_query() → 余弦相似度 → top-k |
| MMR 算法 | 贪心选择,λ × 相关性 - (1-λ) × 冗余性 |
| InMemoryVectorStore | 字典存储 + NumPy 余弦相似度,内置 dump/load |
| VectorStoreRetriever | BaseRetriever 子类,as_retriever() 一键转换 |
| 完整链路 | 文本 → Embeddings → 向量 → VectorStore → Retriever → Documents |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)